ACCUMULO-3638 mostly updated file references to WALs to be Path objects
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/98c3cef8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/98c3cef8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/98c3cef8 Branch: refs/heads/master Commit: 98c3cef8ccfccbe84a5c35ce7576a9225ee03051 Parents: 902ee7d Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue Mar 10 14:07:43 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Mar 10 14:07:43 2015 -0400 ---------------------------------------------------------------------- .../server/master/state/MetaDataStateStore.java | 17 +- .../master/state/TabletLocationState.java | 3 + .../server/master/state/TabletStateStore.java | 7 +- .../master/state/ZooTabletStateStore.java | 11 +- .../accumulo/server/util/MetadataTableUtil.java | 31 ++- .../gc/GarbageCollectWriteAheadLogs.java | 79 +++--- .../accumulo/master/TabletGroupWatcher.java | 15 +- .../apache/accumulo/tserver/TabletServer.java | 6 +- .../apache/accumulo/tserver/log/DfsLogger.java | 6 +- .../accumulo/test/functional/WALSunnyDayIT.java | 240 +++++++++++++++++++ 10 files changed, 334 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index 1749904..decc8c7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -123,7 +124,7 @@ public class MetaDataStateStore extends TabletStateStore { } @Override - public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<String>> logsForDeadServers) throws DistributedStoreException { + public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException { BatchWriter writer = createBatchWriter(); try { @@ -136,10 +137,10 @@ public class MetaDataStateStore extends TabletStateStore { tls.future.clearFutureLocation(m); } if (logsForDeadServers != null) { - List<String> logs = logsForDeadServers.get(tls.futureOrCurrent()); + List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent()); if (logs != null) { - for (String log : logs) { - LogEntry entry = new LogEntry(tls.extent, 0, tls.futureOrCurrent().hostPort(), log); + for (Path log : logs) { + LogEntry entry = new LogEntry(tls.extent, 0, tls.futureOrCurrent().hostPort(), log.toString()); m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue()); } } @@ -163,13 +164,13 @@ public class MetaDataStateStore extends TabletStateStore { } @Override - public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<String>> logs) throws DistributedStoreException { + public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException { BatchWriter writer = createBatchWriter(); try { - for (Entry<TServerInstance,List<String>> entry : logs.entrySet()) { + for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) { Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString()); - for (String log : entry.getValue()) { - m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log), MetadataSchema.CurrentLogsSection.UNUSED); + for (Path log : entry.getValue()) { + m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED); } writer.addMutation(m); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java index a222532..3ece3c9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.accumulo.core.data.KeyExtent; import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; /** * When a tablet is assigned, we mark its future location. When the tablet is opened, we set its current location. A tablet should never have both a future and @@ -32,6 +33,8 @@ import org.apache.hadoop.io.Text; */ public class TabletLocationState { + private static final Logger log = Logger.getLogger(TabletLocationState.class); + static public class BadLocationStateException extends Exception { private static final long serialVersionUID = 1L; private Text metadataTableEntry; http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java index 13db05b..acc10d8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.hadoop.fs.Path; /** * Interface for storing information about tablet assignments. There are three implementations: @@ -61,9 +62,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState> * @param logsForDeadServers * a cache of logs in use by servers when they died */ - abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<String>> logsForDeadServers) throws DistributedStoreException; + abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException; - public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance, List<String>> logsForDeadServers) throws DistributedStoreException { + public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException { TabletStateStore store; if (tls.extent.isRootTablet()) { store = new ZooTabletStateStore(); @@ -90,6 +91,6 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState> /** * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets. */ - abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<String>> logs) throws DistributedStoreException; + abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<Path>> logs) throws DistributedStoreException; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java index 66bad4e..eca8e7f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import com.google.common.net.HostAndPort; @@ -163,17 +164,17 @@ public class ZooTabletStateStore extends TabletStateStore { } @Override - public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<String>> logsForDeadServers) throws DistributedStoreException { + public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException { if (tablets.size() != 1) throw new IllegalArgumentException("There is only one root tablet"); TabletLocationState tls = tablets.iterator().next(); if (tls.extent.compareTo(RootTable.EXTENT) != 0) throw new IllegalArgumentException("You can only store the root tablet location"); if (logsForDeadServers != null) { - List<String> logs = logsForDeadServers.get(tls.futureOrCurrent()); + List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent()); if (logs != null) { - for (String entry : logs) { - LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry); + for (Path entry : logs) { + LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry.toString()); byte[] value; try { value = logEntry.toBytes(); @@ -196,7 +197,7 @@ public class ZooTabletStateStore extends TabletStateStore { } @Override - public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<String>> logs) { + public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) { // the root table is not replicated, so unassigning the root tablet has removed the current log marker } http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index ebf4b1b..96f9c9e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -89,7 +89,6 @@ import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -1061,22 +1060,21 @@ public class MetadataTableUtil { return tabletEntries; } - public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename, KeyExtent extent) { + public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, KeyExtent extent) { log.debug("Adding log entry " + filename); if (extent.isRootTablet()) { retryZooKeeperUpdate(context, zooLock, new ZooOperation() { @Override public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; - String[] parts = StringUtils.split(filename, '/'); - String uniqueId = parts[parts.length - 1]; + String uniqueId = filename.getName(); String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; - rw.putPersistentData(path, filename.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + rw.putPersistentData(path, filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); } }); } else { Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString()); - m.put(CurrentLogsSection.COLF, new Text(filename), new Value(EMPTY_BYTES)); + m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES)); String tableName = MetadataTable.NAME; if (extent.isMeta()) { tableName = RootTable.NAME; @@ -1091,13 +1089,12 @@ public class MetadataTableUtil { } } - private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename) { + private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) { retryZooKeeperUpdate(context, zooLock, new ZooOperation() { @Override public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; - String[] parts = StringUtils.split(filename, '/'); - String uniqueId = parts[parts.length - 1]; + String uniqueId = filename.getName(); String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; log.debug("Removing entry " + path + " from zookeeper"); rw.recursiveDelete(path, NodeMissingPolicy.SKIP); @@ -1105,12 +1102,12 @@ public class MetadataTableUtil { }); } - public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<String> all) throws AccumuloException { + public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException { try { BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null); BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null); - for (String fname : all) { - Text tname = new Text(fname.getBytes(UTF_8)); + for (Path fname : all) { + Text tname = new Text(fname.toString()); Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()); m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname); root.addMutation(m); @@ -1127,21 +1124,21 @@ public class MetadataTableUtil { } } - public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<String>> logsForDeadServers) + public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<Path>> logsForDeadServers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { // already cached if (logsForDeadServers.containsKey(server)) { return; } if (extent.isRootTablet()) { - final List<String> logs = new ArrayList<>(); + final List<Path> logs = new ArrayList<>(); retryZooKeeperUpdate(context, lock, new ZooOperation() { @Override public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; logs.clear(); for (String child : rw.getChildren(root)) { - logs.add(new String(rw.getData(root + "/" + child, null), UTF_8)); + logs.add(new Path(new String(rw.getData(root + "/" + child, null), UTF_8))); } } }); @@ -1155,9 +1152,9 @@ public class MetadataTableUtil { // fetch the current logs in use, and put them in the cache Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY); scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString())); - List<String> logs = new ArrayList<>(); + List<Path> logs = new ArrayList<>(); for (Entry<Key,Value> entry : scanner) { - logs.add(entry.getKey().getColumnQualifier().toString()); + logs.add(new Path(entry.getKey().getColumnQualifier().toString())); } logsForDeadServers.put(server, logs); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index a7703e9..444789b 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -81,6 +81,7 @@ public class GarbageCollectWriteAheadLogs { private final AccumuloServerContext context; private final VolumeManager fs; private final boolean useTrash; + private final LiveTServerSet liveServers; /** * Creates a new GC WAL object. @@ -96,24 +97,26 @@ public class GarbageCollectWriteAheadLogs { this.context = context; this.fs = fs; this.useTrash = useTrash; + this.liveServers = new LiveTServerSet(context, new Listener() { + @Override + public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) { + log.debug("New tablet servers noticed: " + added); + log.debug("Tablet servers removed: " + deleted); + } + }); + liveServers.startListeningForTabletServerChanges(); } public void collect(GCStatus status) { Span span = Trace.start("getCandidates"); try { - LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() { - @Override - public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) { - log.debug("New tablet servers noticed: " + added); - log.debug("Tablet servers removed: " + deleted); - } - }); Set<TServerInstance> currentServers = liveServers.getCurrentServers(); + status.currentLog.started = System.currentTimeMillis(); - Map<TServerInstance, Set<String> > candidates = new HashMap<>(); + Map<TServerInstance, Set<Path> > candidates = new HashMap<>(); long count = getCurrent(candidates, currentServers); long fileScanStop = System.currentTimeMillis(); @@ -174,15 +177,15 @@ public class GarbageCollectWriteAheadLogs { } } - private long removeMarkers(Map<TServerInstance,Set<String>> candidates) { + private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) { long result = 0; try { BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null); BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null); - for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) { - Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.toString()); - for (String wal : entry.getValue()) { - m.putDelete(CurrentLogsSection.COLF, new Text(wal)); + for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) { + Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString()); + for (Path path : entry.getValue()) { + m.putDelete(CurrentLogsSection.COLF, new Text(path.toString())); result++; } root.addMutation(m); @@ -196,11 +199,10 @@ public class GarbageCollectWriteAheadLogs { return result; } - private long removeFiles(Map<TServerInstance, Set<String> > candidates, final GCStatus status) { - for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) { - for (String walog : entry.getValue()) { - log.debug("Removing WAL for offline server " + entry.getKey() + " log " + walog); - Path path = new Path(walog); + private long removeFiles(Map<TServerInstance, Set<Path> > candidates, final GCStatus status) { + for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) { + for (Path path : entry.getValue()) { + log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path); try { if (!useTrash || !fs.moveToTrash(path)) fs.deleteRecursively(path); @@ -215,14 +217,14 @@ public class GarbageCollectWriteAheadLogs { return status.currentLog.deleted; } - private long removeMetadataEntries(Map<TServerInstance, Set<String> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException, + private long removeMetadataEntries(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException, InterruptedException { // remove any entries if there's a log reference, or a tablet is still assigned to the dead server - Map<String, TServerInstance> walToDeadServer = new HashMap<>(); - for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) { - for (String file : entry.getValue()) { + Map<Path, TServerInstance> walToDeadServer = new HashMap<>(); + for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) { + for (Path file : entry.getValue()) { walToDeadServer.put(file, entry.getKey()); } } @@ -248,7 +250,7 @@ public class GarbageCollectWriteAheadLogs { return count; } - protected int removeReplicationEntries(Map<TServerInstance, Set<String> > candidates, GCStatus status) throws IOException, KeeperException, + protected int removeReplicationEntries(Map<TServerInstance, Set<Path> > candidates, GCStatus status) throws IOException, KeeperException, InterruptedException { Connector conn; try { @@ -260,13 +262,13 @@ public class GarbageCollectWriteAheadLogs { int count = 0; - Iterator<Entry<TServerInstance,Set<String>>> walIter = candidates.entrySet().iterator(); + Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator(); while (walIter.hasNext()) { - Entry<TServerInstance,Set<String>> wal = walIter.next(); - Iterator<String> paths = wal.getValue().iterator(); + Entry<TServerInstance,Set<Path>> wal = walIter.next(); + Iterator<Path> paths = wal.getValue().iterator(); while (paths.hasNext()) { - String fullPath = paths.next(); + Path fullPath = paths.next(); if (neededByReplication(conn, fullPath)) { log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); // If we haven't already removed it, check to see if this WAL is @@ -294,7 +296,7 @@ public class GarbageCollectWriteAheadLogs { * The full path (URI) * @return True if the WAL is still needed by replication (not a candidate for deletion) */ - protected boolean neededByReplication(Connector conn, String wal) { + protected boolean neededByReplication(Connector conn, Path wal) { log.info("Checking replication table for " + wal); Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal); @@ -317,7 +319,7 @@ public class GarbageCollectWriteAheadLogs { return false; } - protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) { + protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) { Scanner metaScanner; try { metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); @@ -337,7 +339,7 @@ public class GarbageCollectWriteAheadLogs { StatusSection.limit(replScanner); // Only look for this specific WAL - replScanner.setRange(Range.exact(wal)); + replScanner.setRange(Range.exact(wal.toString())); return Iterables.concat(metaScanner, replScanner); } catch (ReplicationTableOfflineException e) { @@ -353,19 +355,19 @@ public class GarbageCollectWriteAheadLogs { /** * Scans log markers. The map passed in is populated with the logs for dead servers. * - * @param logsForDeadServers + * @param unusedLogs * map of dead server to log file entries * @return total number of log files */ - private long getCurrent(Map<TServerInstance, Set<String> > logsForDeadServers, Set<TServerInstance> currentServers) throws Exception { - Set<String> rootWALs = new HashSet<String>(); + private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception { + Set<Path> rootWALs = new HashSet<>(); // Get entries in zookeeper: String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS; ZooReaderWriter zoo = ZooReaderWriter.getInstance(); List<String> children = zoo.getChildren(zpath); for (String child : children) { LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null)); - rootWALs.add(entry.filename); + rootWALs.add(new Path(entry.filename)); } long count = 0; @@ -383,12 +385,13 @@ public class GarbageCollectWriteAheadLogs { CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId); CurrentLogsSection.getPath(entry.getKey(), filename); TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString()); - if ((!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED)) && !rootWALs.contains(filename)) { - Set<String> logs = logsForDeadServers.get(tsi); + Path path = new Path(filename.toString()); + if ((!currentServers.contains(tsi) || (entry.getValue().equals(CurrentLogsSection.UNUSED)) && !rootWALs.contains(path))) { + Set<Path> logs = unusedLogs.get(tsi); if (logs == null) { - logsForDeadServers.put(tsi, logs = new HashSet<String>()); + unusedLogs.put(tsi, logs = new HashSet<Path>()); } - if (logs.add(new Path(filename.toString()).toString())) { + if (logs.add(path)) { count++; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index edea93f..4d237a1 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -161,7 +161,7 @@ class TabletGroupWatcher extends Daemon { List<Assignment> assigned = new ArrayList<Assignment>(); List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>(); Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>(); - Map<TServerInstance, List<String>> logsForDeadServers = new TreeMap<>(); + Map<TServerInstance, List<Path>> logsForDeadServers = new TreeMap<>(); MasterState masterState = master.getMasterState(); int[] counts = new int[TabletState.values().length]; @@ -204,8 +204,9 @@ class TabletGroupWatcher extends Daemon { TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo()); TServerInstance server = tls.getServer(); TabletState state = tls.getState(currentTServers.keySet()); - if (Master.log.isTraceEnabled()) - Master.log.trace("Goal state " + goal + " current " + state); + if (Master.log.isTraceEnabled()) { + Master.log.trace("Goal state " + goal + " current " + state + " for " + tls.extent); + } stats.update(tableId, state); mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); sendChopRequest(mergeStats.getMergeInfo(), state, tls); @@ -308,8 +309,10 @@ class TabletGroupWatcher extends Daemon { updateMergeState(mergeStatsCache); - Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); - eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { + Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); + eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + } } catch (Exception ex) { Master.log.error("Error processing table state for store " + store.name(), ex); if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { @@ -731,7 +734,7 @@ class TabletGroupWatcher extends Daemon { List<Assignment> assignments, List<Assignment> assigned, List<TabletLocationState> assignedToDeadServers, - Map<TServerInstance, List<String>> logsForDeadServers, + Map<TServerInstance, List<Path>> logsForDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException { if (!assignedToDeadServers.isEmpty()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3b7ff03..ffc1c2a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -3000,9 +3000,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable { candidates.removeAll(tablet.getCurrentLogFiles()); } try { - Set<String> filenames = new HashSet<>(); + Set<Path> filenames = new HashSet<>(); for (DfsLogger candidate : candidates) { - filenames.add(candidate.getFileName()); + filenames.add(candidate.getPath()); } MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames); synchronized (closedLogs) { @@ -3019,7 +3019,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level)); if (set == null || !set.contains(level) || level == TabletLevel.ROOT) { log.info("Writing log marker for level " + level + " " + copy.getFileName()); - MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getFileName(), extent); + MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), extent); if (set != null) { set.add(level); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index f8bcfbc..e256604 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -475,7 +475,11 @@ public class DfsLogger implements Comparable<DfsLogger> { } public String getFileName() { - return logPath.toString(); + return logPath; + } + + public Path getPath() { + return new Path(logPath); } public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java new file mode 100644 index 0000000..b8e36bc --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY; +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START; +import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT; +import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE; +import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION; +import static org.apache.accumulo.core.security.Authorizations.EMPTY; +import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.master.state.SetGoalState; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class WALSunnyDayIT extends ConfigurableMacIT { + + private static final Text CF = new Text(new byte[0]); + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(GC_CYCLE_DELAY, "1s"); + cfg.setProperty(GC_CYCLE_START, "0s"); + cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M"); + cfg.setProperty(TSERV_WAL_REPLICATION, "1"); + cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s"); + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + int countTrue(Collection<Boolean> bools) { + int result = 0; + for (Boolean b : bools) { + if (b.booleanValue()) + result ++; + } + return result; + } + + @Test + public void test() throws Exception { + MiniAccumuloClusterImpl mac = getCluster(); + MiniAccumuloClusterControl control = mac.getClusterControl(); + control.stop(GARBAGE_COLLECTOR); + Connector c = getConnector(); + ZooKeeper zoo = new ZooKeeper(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut(), new Watcher() { + @Override + public void process(WatchedEvent event) { + log.info(event.toString()); + } + }); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + writeSomeData(c, tableName, 1, 1); + + // should have two markers: wal in use, and next wal + Map<String,Boolean> wals = getWals(c, zoo); + assertEquals(wals.toString(), 2, wals.size()); + for (Boolean b : wals.values()) { + assertTrue("logs should be in use", b.booleanValue()); + } + + // roll log, get a new next + writeSomeData(c, tableName, 1000, 50); + Map<String,Boolean> walsAfterRoll = getWals(c, zoo); + assertEquals("should have 3 WALs after roll", 3, walsAfterRoll.size()); + assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet())); + assertEquals("all WALs should be in use", 3, countTrue(walsAfterRoll.values())); + + // flush the tables + for (String table: new String[] { tableName, MetadataTable.NAME, RootTable.NAME} ) { + c.tableOperations().flush(table, null, null, true); + } + // rolled WAL is no longer in use, but needs to be GC'd + Map<String,Boolean> walsAfterflush = getWals(c, zoo); + assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size()); + assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values())); + + // let the GC run for a little bit + control.start(GARBAGE_COLLECTOR); + UtilWaitThread.sleep(5 * 1000); + // make sure the unused WAL goes away + Map<String,Boolean> walsAfterGC = getWals(c, zoo); + assertEquals(walsAfterGC.toString(), 2, walsAfterGC.size()); + control.stop(GARBAGE_COLLECTOR); + // restart the tserver, but don't run recovery on all tablets + control.stop(TABLET_SERVER); + // this delays recovery on the normal tables + assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor()); + control.start(TABLET_SERVER); + + // wait for the metadata table to go back online + getRecoveryMarkers(c); + // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets + UtilWaitThread.sleep(5 * 1000); + Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c); + //log.debug("markers " + markers); + assertEquals("one tablet should have markers", 1, markers.keySet().size()); + assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1")); + + // put some data in the WAL + assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor()); + writeSomeData(c, tableName, 100, 100); + + Map<String,Boolean> walsAfterRestart = getWals(c, zoo); + //log.debug("wals after " + walsAfterRestart); + assertEquals("used WALs after restart should be 2", 2, countTrue(walsAfterRestart.values())); + control.start(GARBAGE_COLLECTOR); + UtilWaitThread.sleep(5 * 1000); + Map<String,Boolean> walsAfterRestartAndGC = getWals(c, zoo); + assertEquals("wals left should be 2", 2, walsAfterRestartAndGC.size()); + assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values())); + } + + private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception { + Random rand = new Random(); + BatchWriter bw = conn.createBatchWriter(tableName, null); + byte[] rowData = new byte[10]; + byte[] cq = new byte[10]; + byte[] value = new byte[10]; + + for (int r = 0; r < row; r++) { + rand.nextBytes(rowData); + Mutation m = new Mutation(rowData); + for (int c = 0; c < col; c++) { + rand.nextBytes(cq); + rand.nextBytes(value); + m.put(CF, new Text(cq), new Value(value)); + } + bw.addMutation(m); + if (r % 100 == 0) { + bw.flush(); + } + } + bw.close(); + } + + private Map<String, Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception { + Map<String, Boolean> result = new HashMap<>(); + Scanner root = c.createScanner(RootTable.NAME, EMPTY); + root.setRange(CurrentLogsSection.getRange()); + Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY); + meta.setRange(root.getRange()); + Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator()); + while (both.hasNext()) { + Entry<Key,Value> entry = both.next(); + Text path = new Text(); + CurrentLogsSection.getPath(entry.getKey(), path); + result.put(path.toString(), entry.getValue().get().length == 0); + } + String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + List<String> children = zoo.getChildren(zpath, null); + for (String child : children) { + byte[] data = zoo.getData(zpath + "/" + child, null, null); + result.put(new String(data), true); + } + return result; + } + + private Map<KeyExtent, List<String>> getRecoveryMarkers(Connector c) throws Exception { + Map<KeyExtent, List<String>> result = new HashMap<>(); + Scanner root = c.createScanner(RootTable.NAME, EMPTY); + root.setRange(TabletsSection.getRange()); + root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + TabletColumnFamily.PREV_ROW_COLUMN.fetch(root); + + Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY); + meta.setRange(TabletsSection.getRange()); + meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta); + + List<String> logs = new ArrayList<>(); + Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator()); + while (both.hasNext()) { + Entry<Key,Value> entry = both.next(); + Key key = entry.getKey(); + if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) { + logs.add(key.getColumnQualifier().toString()); + } + if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) { + KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue()); + result.put(extent, logs); + logs = new ArrayList<String>(); + } + } + return result; + } + +}