ACCUMULO-3638 mostly updated file references to WALs to be Path objects

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/98c3cef8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/98c3cef8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/98c3cef8

Branch: refs/heads/master
Commit: 98c3cef8ccfccbe84a5c35ce7576a9225ee03051
Parents: 902ee7d
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Tue Mar 10 14:07:43 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Tue Mar 10 14:07:43 2015 -0400

----------------------------------------------------------------------
 .../server/master/state/MetaDataStateStore.java |  17 +-
 .../master/state/TabletLocationState.java       |   3 +
 .../server/master/state/TabletStateStore.java   |   7 +-
 .../master/state/ZooTabletStateStore.java       |  11 +-
 .../accumulo/server/util/MetadataTableUtil.java |  31 ++-
 .../gc/GarbageCollectWriteAheadLogs.java        |  79 +++---
 .../accumulo/master/TabletGroupWatcher.java     |  15 +-
 .../apache/accumulo/tserver/TabletServer.java   |   6 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  |   6 +-
 .../accumulo/test/functional/WALSunnyDayIT.java | 240 +++++++++++++++++++
 10 files changed, 334 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 1749904..decc8c7 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
@@ -123,7 +124,7 @@ public class MetaDataStateStore extends TabletStateStore {
   }
 
   @Override
-  public void unassign(Collection<TabletLocationState> tablets, 
Map<TServerInstance, List<String>> logsForDeadServers) throws 
DistributedStoreException {
+  public void unassign(Collection<TabletLocationState> tablets, 
Map<TServerInstance, List<Path>> logsForDeadServers) throws 
DistributedStoreException {
 
     BatchWriter writer = createBatchWriter();
     try {
@@ -136,10 +137,10 @@ public class MetaDataStateStore extends TabletStateStore {
           tls.future.clearFutureLocation(m);
         }
         if (logsForDeadServers != null) {
-          List<String> logs = logsForDeadServers.get(tls.futureOrCurrent());
+          List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
           if (logs != null) {
-            for (String log : logs) {
-              LogEntry entry = new LogEntry(tls.extent, 0, 
tls.futureOrCurrent().hostPort(), log);
+            for (Path log : logs) {
+              LogEntry entry = new LogEntry(tls.extent, 0, 
tls.futureOrCurrent().hostPort(), log.toString());
               m.put(entry.getColumnFamily(), entry.getColumnQualifier(), 
entry.getValue());
             }
           }
@@ -163,13 +164,13 @@ public class MetaDataStateStore extends TabletStateStore {
   }
 
   @Override
-  public void markLogsAsUnused(AccumuloServerContext context, 
Map<TServerInstance,List<String>> logs) throws DistributedStoreException {
+  public void markLogsAsUnused(AccumuloServerContext context, 
Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
     BatchWriter writer = createBatchWriter();
     try {
-      for (Entry<TServerInstance,List<String>> entry : logs.entrySet()) {
+      for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
         Mutation m = new 
Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + 
entry.getKey().toString());
-        for (String log : entry.getValue()) {
-          m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log), 
MetadataSchema.CurrentLogsSection.UNUSED);
+        for (Path log : entry.getValue()) {
+          m.put(MetadataSchema.CurrentLogsSection.COLF, new 
Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
         }
         writer.addMutation(m);
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index a222532..3ece3c9 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 /**
  * When a tablet is assigned, we mark its future location. When the tablet is 
opened, we set its current location. A tablet should never have both a future 
and
@@ -32,6 +33,8 @@ import org.apache.hadoop.io.Text;
  */
 public class TabletLocationState {
 
+  private static final Logger log = 
Logger.getLogger(TabletLocationState.class);
+
   static public class BadLocationStateException extends Exception {
     private static final long serialVersionUID = 1L;
     private Text metadataTableEntry;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 13db05b..acc10d8 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Interface for storing information about tablet assignments. There are three 
implementations:
@@ -61,9 +62,9 @@ public abstract class TabletStateStore implements 
Iterable<TabletLocationState>
    * @param logsForDeadServers
    *          a cache of logs in use by servers when they died
    */
-  abstract public void unassign(Collection<TabletLocationState> tablets, 
Map<TServerInstance, List<String>> logsForDeadServers) throws 
DistributedStoreException;
+  abstract public void unassign(Collection<TabletLocationState> tablets, 
Map<TServerInstance, List<Path>> logsForDeadServers) throws 
DistributedStoreException;
 
-  public static void unassign(AccumuloServerContext context, 
TabletLocationState tls, Map<TServerInstance, List<String>> logsForDeadServers) 
throws DistributedStoreException {
+  public static void unassign(AccumuloServerContext context, 
TabletLocationState tls, Map<TServerInstance, List<Path>> logsForDeadServers) 
throws DistributedStoreException {
     TabletStateStore store;
     if (tls.extent.isRootTablet()) {
       store = new ZooTabletStateStore();
@@ -90,6 +91,6 @@ public abstract class TabletStateStore implements 
Iterable<TabletLocationState>
   /**
    * When a server fails, its logs must be marked as unused after the log 
markers are moved to the tablets.
    */
-  abstract public void markLogsAsUnused(AccumuloServerContext context, 
Map<TServerInstance, List<String>> logs) throws DistributedStoreException;
+  abstract public void markLogsAsUnused(AccumuloServerContext context, 
Map<TServerInstance, List<Path>> logs) throws DistributedStoreException;
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index 66bad4e..eca8e7f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -31,6 +31,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 import com.google.common.net.HostAndPort;
@@ -163,17 +164,17 @@ public class ZooTabletStateStore extends TabletStateStore 
{
   }
 
   @Override
-  public void unassign(Collection<TabletLocationState> tablets, 
Map<TServerInstance, List<String>> logsForDeadServers) throws 
DistributedStoreException {
+  public void unassign(Collection<TabletLocationState> tablets, 
Map<TServerInstance, List<Path>> logsForDeadServers) throws 
DistributedStoreException {
     if (tablets.size() != 1)
       throw new IllegalArgumentException("There is only one root tablet");
     TabletLocationState tls = tablets.iterator().next();
     if (tls.extent.compareTo(RootTable.EXTENT) != 0)
       throw new IllegalArgumentException("You can only store the root tablet 
location");
     if (logsForDeadServers != null) {
-      List<String> logs = logsForDeadServers.get(tls.futureOrCurrent());
+      List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
       if (logs != null) {
-        for (String entry : logs) {
-          LogEntry logEntry = new LogEntry(RootTable.EXTENT, 
System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), 
entry);
+        for (Path entry : logs) {
+          LogEntry logEntry = new LogEntry(RootTable.EXTENT, 
System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), 
entry.toString());
           byte[] value;
           try {
             value = logEntry.toBytes();
@@ -196,7 +197,7 @@ public class ZooTabletStateStore extends TabletStateStore {
   }
 
   @Override
-  public void markLogsAsUnused(AccumuloServerContext context, 
Map<TServerInstance,List<String>> logs) {
+  public void markLogsAsUnused(AccumuloServerContext context, 
Map<TServerInstance,List<Path>> logs) {
     // the root table is not replicated, so unassigning the root tablet has 
removed the current log marker
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index ebf4b1b..96f9c9e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -89,7 +89,6 @@ import 
org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -1061,22 +1060,21 @@ public class MetadataTableUtil {
     return tabletEntries;
   }
 
-  public static void addNewLogMarker(ClientContext context, ZooLock zooLock, 
final TServerInstance tabletSession, final String filename, KeyExtent extent) {
+  public static void addNewLogMarker(ClientContext context, ZooLock zooLock, 
final TServerInstance tabletSession, final Path filename, KeyExtent extent) {
     log.debug("Adding log entry " + filename);
     if (extent.isRootTablet()) {
       retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
         @Override
         public void run(IZooReaderWriter rw) throws KeeperException, 
InterruptedException, IOException {
           String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + 
RootTable.ZROOT_TABLET_CURRENT_LOGS;
-          String[] parts = StringUtils.split(filename, '/');
-          String uniqueId = parts[parts.length - 1];
+          String uniqueId = filename.getName();
           String path = root + "/" + CurrentLogsSection.getRowPrefix() + 
tabletSession.toString() + uniqueId;
-          rw.putPersistentData(path, filename.getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
+          rw.putPersistentData(path, filename.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
         }
       });
     } else {
       Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + 
tabletSession.toString());
-      m.put(CurrentLogsSection.COLF, new Text(filename), new 
Value(EMPTY_BYTES));
+      m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new 
Value(EMPTY_BYTES));
       String tableName = MetadataTable.NAME;
       if (extent.isMeta()) {
         tableName = RootTable.NAME;
@@ -1091,13 +1089,12 @@ public class MetadataTableUtil {
     }
   }
 
-  private static void removeCurrentRootLogMarker(ClientContext context, 
ZooLock zooLock, final TServerInstance tabletSession, final String filename) {
+  private static void removeCurrentRootLogMarker(ClientContext context, 
ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
     retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
       @Override
       public void run(IZooReaderWriter rw) throws KeeperException, 
InterruptedException, IOException {
         String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + 
RootTable.ZROOT_TABLET_CURRENT_LOGS;
-        String[] parts = StringUtils.split(filename, '/');
-        String uniqueId = parts[parts.length - 1];
+        String uniqueId = filename.getName();
         String path = root + "/" + CurrentLogsSection.getRowPrefix() + 
tabletSession.toString() + uniqueId;
         log.debug("Removing entry " + path + " from zookeeper");
         rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
@@ -1105,12 +1102,12 @@ public class MetadataTableUtil {
     });
   }
 
-  public static void markLogUnused(ClientContext context, ZooLock lock, 
TServerInstance tabletSession, Set<String> all) throws AccumuloException {
+  public static void markLogUnused(ClientContext context, ZooLock lock, 
TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
     try {
       BatchWriter root = 
context.getConnector().createBatchWriter(RootTable.NAME, null);
       BatchWriter meta = 
context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-      for (String fname : all) {
-        Text tname = new Text(fname.getBytes(UTF_8));
+      for (Path fname : all) {
+        Text tname = new Text(fname.toString());
         Mutation m = new 
Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + 
tabletSession.toString());
         m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
         root.addMutation(m);
@@ -1127,21 +1124,21 @@ public class MetadataTableUtil {
     }
   }
 
-  public static void fetchLogsForDeadServer(ClientContext context, ZooLock 
lock, KeyExtent extent, TServerInstance server, 
Map<TServerInstance,List<String>> logsForDeadServers)
+  public static void fetchLogsForDeadServer(ClientContext context, ZooLock 
lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<Path>> 
logsForDeadServers)
       throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
     // already cached
     if (logsForDeadServers.containsKey(server)) {
       return;
     }
     if (extent.isRootTablet()) {
-      final List<String> logs = new ArrayList<>();
+      final List<Path> logs = new ArrayList<>();
       retryZooKeeperUpdate(context, lock, new ZooOperation() {
         @Override
         public void run(IZooReaderWriter rw) throws KeeperException, 
InterruptedException, IOException {
           String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + 
RootTable.ZROOT_TABLET_CURRENT_LOGS;
           logs.clear();
           for (String child : rw.getChildren(root)) {
-            logs.add(new String(rw.getData(root + "/" + child, null), UTF_8));
+            logs.add(new Path(new String(rw.getData(root + "/" + child, null), 
UTF_8)));
           }
         }
       });
@@ -1155,9 +1152,9 @@ public class MetadataTableUtil {
       // fetch the current logs in use, and put them in the cache
       Scanner scanner = context.getConnector().createScanner(table, 
Authorizations.EMPTY);
       scanner.setRange(new 
Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
-      List<String> logs = new ArrayList<>();
+      List<Path> logs = new ArrayList<>();
       for (Entry<Key,Value> entry : scanner) {
-        logs.add(entry.getKey().getColumnQualifier().toString());
+        logs.add(new Path(entry.getKey().getColumnQualifier().toString()));
       }
       logsForDeadServers.put(server, logs);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index a7703e9..444789b 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -81,6 +81,7 @@ public class GarbageCollectWriteAheadLogs {
   private final AccumuloServerContext context;
   private final VolumeManager fs;
   private final boolean useTrash;
+  private final LiveTServerSet liveServers;
 
   /**
    * Creates a new GC WAL object.
@@ -96,24 +97,26 @@ public class GarbageCollectWriteAheadLogs {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.liveServers = new LiveTServerSet(context, new Listener() {
+      @Override
+      public void update(LiveTServerSet current, Set<TServerInstance> deleted, 
Set<TServerInstance> added) {
+        log.debug("New tablet servers noticed: " + added);
+        log.debug("Tablet servers removed: " + deleted);
+      }
+    });
+    liveServers.startListeningForTabletServerChanges();
   }
 
   public void collect(GCStatus status) {
 
     Span span = Trace.start("getCandidates");
     try {
-      LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() {
-        @Override
-        public void update(LiveTServerSet current, Set<TServerInstance> 
deleted, Set<TServerInstance> added) {
-          log.debug("New tablet servers noticed: " + added);
-          log.debug("Tablet servers removed: " + deleted);
-        }
-      });
       Set<TServerInstance> currentServers = liveServers.getCurrentServers();
 
+
       status.currentLog.started = System.currentTimeMillis();
 
-      Map<TServerInstance, Set<String> > candidates = new HashMap<>();
+      Map<TServerInstance, Set<Path> > candidates = new HashMap<>();
       long count = getCurrent(candidates, currentServers);
       long fileScanStop = System.currentTimeMillis();
 
@@ -174,15 +177,15 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  private long removeMarkers(Map<TServerInstance,Set<String>> candidates) {
+  private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) {
     long result = 0;
     try {
       BatchWriter root = 
context.getConnector().createBatchWriter(RootTable.NAME, null);
       BatchWriter meta = 
context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-      for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
-        Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + 
entry.toString());
-        for (String wal : entry.getValue()) {
-          m.putDelete(CurrentLogsSection.COLF, new Text(wal));
+      for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+        Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + 
entry.getKey().toString());
+        for (Path path : entry.getValue()) {
+          m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
           result++;
         }
         root.addMutation(m);
@@ -196,11 +199,10 @@ public class GarbageCollectWriteAheadLogs {
     return result;
   }
 
-  private long removeFiles(Map<TServerInstance, Set<String> > candidates, 
final GCStatus status) {
-    for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
-      for (String walog : entry.getValue()) {
-        log.debug("Removing WAL for offline server " + entry.getKey() + " log 
" + walog);
-        Path path = new Path(walog);
+  private long removeFiles(Map<TServerInstance, Set<Path> > candidates, final 
GCStatus status) {
+    for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+      for (Path path : entry.getValue()) {
+        log.debug("Removing unused WAL for server " + entry.getKey() + " log " 
+ path);
         try {
           if (!useTrash || !fs.moveToTrash(path))
             fs.deleteRecursively(path);
@@ -215,14 +217,14 @@ public class GarbageCollectWriteAheadLogs {
     return status.currentLog.deleted;
   }
 
-  private long removeMetadataEntries(Map<TServerInstance, Set<String> > 
candidates, GCStatus status, Set<TServerInstance> liveServers) throws 
IOException, KeeperException,
+  private long removeMetadataEntries(Map<TServerInstance, Set<Path> > 
candidates, GCStatus status, Set<TServerInstance> liveServers) throws 
IOException, KeeperException,
       InterruptedException {
 
     // remove any entries if there's a log reference, or a tablet is still 
assigned to the dead server
 
-    Map<String, TServerInstance> walToDeadServer = new HashMap<>();
-    for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
-      for (String file : entry.getValue()) {
+    Map<Path, TServerInstance> walToDeadServer = new HashMap<>();
+    for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+      for (Path file : entry.getValue()) {
         walToDeadServer.put(file, entry.getKey());
       }
     }
@@ -248,7 +250,7 @@ public class GarbageCollectWriteAheadLogs {
     return count;
   }
 
-  protected int removeReplicationEntries(Map<TServerInstance, Set<String> > 
candidates, GCStatus status) throws IOException, KeeperException,
+  protected int removeReplicationEntries(Map<TServerInstance, Set<Path> > 
candidates, GCStatus status) throws IOException, KeeperException,
   InterruptedException {
     Connector conn;
     try {
@@ -260,13 +262,13 @@ public class GarbageCollectWriteAheadLogs {
 
     int count = 0;
 
-    Iterator<Entry<TServerInstance,Set<String>>> walIter = 
candidates.entrySet().iterator();
+    Iterator<Entry<TServerInstance,Set<Path>>> walIter = 
candidates.entrySet().iterator();
 
     while (walIter.hasNext()) {
-      Entry<TServerInstance,Set<String>> wal = walIter.next();
-      Iterator<String> paths = wal.getValue().iterator();
+      Entry<TServerInstance,Set<Path>> wal = walIter.next();
+      Iterator<Path> paths = wal.getValue().iterator();
       while (paths.hasNext()) {
-        String fullPath = paths.next();
+        Path fullPath = paths.next();
         if (neededByReplication(conn, fullPath)) {
           log.debug("Removing WAL from candidate deletion as it is still 
needed for replication: {}", fullPath);
           // If we haven't already removed it, check to see if this WAL is
@@ -294,7 +296,7 @@ public class GarbageCollectWriteAheadLogs {
    *          The full path (URI)
    * @return True if the WAL is still needed by replication (not a candidate 
for deletion)
    */
-  protected boolean neededByReplication(Connector conn, String wal) {
+  protected boolean neededByReplication(Connector conn, Path wal) {
     log.info("Checking replication table for " + wal);
 
     Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
@@ -317,7 +319,7 @@ public class GarbageCollectWriteAheadLogs {
     return false;
   }
 
-  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector 
conn, String wal) {
+  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector 
conn, Path wal) {
     Scanner metaScanner;
     try {
       metaScanner = conn.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
@@ -337,7 +339,7 @@ public class GarbageCollectWriteAheadLogs {
       StatusSection.limit(replScanner);
 
       // Only look for this specific WAL
-      replScanner.setRange(Range.exact(wal));
+      replScanner.setRange(Range.exact(wal.toString()));
 
       return Iterables.concat(metaScanner, replScanner);
     } catch (ReplicationTableOfflineException e) {
@@ -353,19 +355,19 @@ public class GarbageCollectWriteAheadLogs {
   /**
    * Scans log markers. The map passed in is populated with the logs for dead 
servers.
    *
-   * @param logsForDeadServers
+   * @param unusedLogs
    *          map of dead server to log file entries
    * @return total number of log files
    */
-  private long getCurrent(Map<TServerInstance, Set<String> > 
logsForDeadServers, Set<TServerInstance> currentServers) throws Exception {
-    Set<String> rootWALs = new HashSet<String>();
+  private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, 
Set<TServerInstance> currentServers) throws Exception {
+    Set<Path> rootWALs = new HashSet<>();
     // Get entries in zookeeper:
     String zpath = ZooUtil.getRoot(context.getInstance()) + 
RootTable.ZROOT_TABLET_WALOGS;
     ZooReaderWriter zoo = ZooReaderWriter.getInstance();
     List<String> children = zoo.getChildren(zpath);
     for (String child : children) {
       LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, 
null));
-      rootWALs.add(entry.filename);
+      rootWALs.add(new Path(entry.filename));
     }
     long count = 0;
 
@@ -383,12 +385,13 @@ public class GarbageCollectWriteAheadLogs {
       CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, 
sessionId);
       CurrentLogsSection.getPath(entry.getKey(), filename);
       TServerInstance tsi = new 
TServerInstance(HostAndPort.fromString(hostAndPort.toString()), 
sessionId.toString());
-      if ((!currentServers.contains(tsi) || 
entry.getValue().equals(CurrentLogsSection.UNUSED)) && 
!rootWALs.contains(filename)) {
-        Set<String> logs = logsForDeadServers.get(tsi);
+      Path path = new Path(filename.toString());
+      if ((!currentServers.contains(tsi) || 
(entry.getValue().equals(CurrentLogsSection.UNUSED)) && 
!rootWALs.contains(path))) {
+        Set<Path> logs = unusedLogs.get(tsi);
         if (logs == null) {
-          logsForDeadServers.put(tsi, logs = new HashSet<String>());
+          unusedLogs.put(tsi, logs = new HashSet<Path>());
         }
-        if (logs.add(new Path(filename.toString()).toString())) {
+        if (logs.add(path)) {
           count++;
         }
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
 
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index edea93f..4d237a1 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -161,7 +161,7 @@ class TabletGroupWatcher extends Daemon {
         List<Assignment> assigned = new ArrayList<Assignment>();
         List<TabletLocationState> assignedToDeadServers = new 
ArrayList<TabletLocationState>();
         Map<KeyExtent,TServerInstance> unassigned = new 
HashMap<KeyExtent,TServerInstance>();
-        Map<TServerInstance, List<String>> logsForDeadServers = new 
TreeMap<>();
+        Map<TServerInstance, List<Path>> logsForDeadServers = new TreeMap<>();
 
         MasterState masterState = master.getMasterState();
         int[] counts = new int[TabletState.values().length];
@@ -204,8 +204,9 @@ class TabletGroupWatcher extends Daemon {
           TabletGoalState goal = this.master.getGoalState(tls, 
mergeStats.getMergeInfo());
           TServerInstance server = tls.getServer();
           TabletState state = tls.getState(currentTServers.keySet());
-          if (Master.log.isTraceEnabled())
-            Master.log.trace("Goal state " + goal + " current " + state);
+          if (Master.log.isTraceEnabled()) {
+            Master.log.trace("Goal state " + goal + " current " + state + " 
for " + tls.extent);
+          }
           stats.update(tableId, state);
           mergeStats.update(tls.extent, state, tls.chopped, 
!tls.walogs.isEmpty());
           sendChopRequest(mergeStats.getMergeInfo(), state, tls);
@@ -308,8 +309,10 @@ class TabletGroupWatcher extends Daemon {
 
         updateMergeState(mergeStatsCache);
 
-        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", 
store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
-        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+        if 
(this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
+          Master.log.debug(String.format("[%s] sleeping for %.2f seconds", 
store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+        }
       } catch (Exception ex) {
         Master.log.error("Error processing table state for store " + 
store.name(), ex);
         if (ex.getCause() != null && ex.getCause() instanceof 
BadLocationStateException) {
@@ -731,7 +734,7 @@ class TabletGroupWatcher extends Daemon {
       List<Assignment> assignments,
       List<Assignment> assigned,
       List<TabletLocationState> assignedToDeadServers,
-      Map<TServerInstance, List<String>> logsForDeadServers,
+      Map<TServerInstance, List<Path>> logsForDeadServers,
       Map<KeyExtent,TServerInstance> unassigned)
           throws DistributedStoreException, TException {
     if (!assignedToDeadServers.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 3b7ff03..ffc1c2a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3000,9 +3000,9 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
       candidates.removeAll(tablet.getCurrentLogFiles());
     }
     try {
-      Set<String> filenames = new HashSet<>();
+      Set<Path> filenames = new HashSet<>();
       for (DfsLogger candidate : candidates) {
-        filenames.add(candidate.getFileName());
+        filenames.add(candidate.getPath());
       }
       MetadataTableUtil.markLogUnused(this, this.getLock(), 
this.getTabletSession(), filenames);
       synchronized (closedLogs) {
@@ -3019,7 +3019,7 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
       EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, 
EnumSet.of(level));
       if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
         log.info("Writing log marker for level " + level + " " + 
copy.getFileName());
-        MetadataTableUtil.addNewLogMarker(this, this.getLock(), 
this.getTabletSession(), copy.getFileName(), extent);
+        MetadataTableUtil.addNewLogMarker(this, this.getLock(), 
this.getTabletSession(), copy.getPath(), extent);
         if (set != null) {
           set.add(level);
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index f8bcfbc..e256604 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -475,7 +475,11 @@ public class DfsLogger implements Comparable<DfsLogger> {
   }
 
   public String getFileName() {
-    return logPath.toString();
+    return logPath;
+  }
+
+  public Path getPath() {
+    return new Path(logPath);
   }
 
   public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98c3cef8/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java 
b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
new file mode 100644
index 0000000..b8e36bc
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
+import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
+import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
+import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
+import static org.apache.accumulo.core.security.Authorizations.EMPTY;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class WALSunnyDayIT extends ConfigurableMacIT {
+
+  private static final Text CF = new Text(new byte[0]);
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(GC_CYCLE_DELAY, "1s");
+    cfg.setProperty(GC_CYCLE_START, "0s");
+    cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(TSERV_WAL_REPLICATION, "1");
+    cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s");
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  int countTrue(Collection<Boolean> bools) {
+    int result = 0;
+    for (Boolean b : bools) {
+      if (b.booleanValue())
+        result ++;
+    }
+    return result;
+  }
+
+  @Test
+  public void test() throws Exception {
+    MiniAccumuloClusterImpl mac = getCluster();
+    MiniAccumuloClusterControl control = mac.getClusterControl();
+    control.stop(GARBAGE_COLLECTOR);
+    Connector c = getConnector();
+    ZooKeeper zoo = new ZooKeeper(c.getInstance().getZooKeepers(), 
c.getInstance().getZooKeepersSessionTimeOut(), new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        log.info(event.toString());
+      }
+    });
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    writeSomeData(c, tableName, 1, 1);
+
+    // should have two markers: wal in use, and next wal
+    Map<String,Boolean> wals = getWals(c, zoo);
+    assertEquals(wals.toString(), 2, wals.size());
+    for (Boolean b : wals.values()) {
+      assertTrue("logs should be in use", b.booleanValue());
+    }
+
+    // roll log, get a new next
+    writeSomeData(c, tableName, 1000, 50);
+    Map<String,Boolean> walsAfterRoll = getWals(c, zoo);
+    assertEquals("should have 3 WALs after roll", 3, walsAfterRoll.size());
+    assertTrue("new WALs should be a superset of the old WALs", 
walsAfterRoll.keySet().containsAll(wals.keySet()));
+    assertEquals("all WALs should be in use", 3, 
countTrue(walsAfterRoll.values()));
+
+    // flush the tables
+    for (String table: new String[] { tableName, MetadataTable.NAME, 
RootTable.NAME} ) {
+      c.tableOperations().flush(table, null, null, true);
+    }
+    // rolled WAL is no longer in use, but needs to be GC'd
+    Map<String,Boolean> walsAfterflush = getWals(c, zoo);
+    assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size());
+    assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values()));
+
+    // let the GC run for a little bit
+    control.start(GARBAGE_COLLECTOR);
+    UtilWaitThread.sleep(5 * 1000);
+    // make sure the unused WAL goes away
+    Map<String,Boolean> walsAfterGC = getWals(c, zoo);
+    assertEquals(walsAfterGC.toString(), 2, walsAfterGC.size());
+    control.stop(GARBAGE_COLLECTOR);
+    // restart the tserver, but don't run recovery on all tablets
+    control.stop(TABLET_SERVER);
+    // this delays recovery on the normal tables
+    assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
+    control.start(TABLET_SERVER);
+
+    // wait for the metadata table to go back online
+    getRecoveryMarkers(c);
+    // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER 
tablets
+    UtilWaitThread.sleep(5 * 1000);
+    Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
+    //log.debug("markers " + markers);
+    assertEquals("one tablet should have markers", 1, markers.keySet().size());
+    assertEquals("tableId of the keyExtent should be 1", 
markers.keySet().iterator().next().getTableId(), new Text("1"));
+
+    // put some data in the WAL
+    assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
+    writeSomeData(c, tableName, 100, 100);
+
+    Map<String,Boolean> walsAfterRestart = getWals(c, zoo);
+    //log.debug("wals after " + walsAfterRestart);
+    assertEquals("used WALs after restart should be 2", 2, 
countTrue(walsAfterRestart.values()));
+    control.start(GARBAGE_COLLECTOR);
+    UtilWaitThread.sleep(5 * 1000);
+    Map<String,Boolean> walsAfterRestartAndGC = getWals(c, zoo);
+    assertEquals("wals left should be 2", 2, walsAfterRestartAndGC.size());
+    assertEquals("logs in use should be 2", 2, 
countTrue(walsAfterRestartAndGC.values()));
+  }
+
+  private void writeSomeData(Connector conn, String tableName, int row, int 
col) throws Exception {
+    Random rand = new Random();
+    BatchWriter bw = conn.createBatchWriter(tableName, null);
+    byte[] rowData = new byte[10];
+    byte[] cq = new byte[10];
+    byte[] value = new byte[10];
+
+    for (int r = 0; r < row; r++) {
+      rand.nextBytes(rowData);
+      Mutation m = new Mutation(rowData);
+      for (int c = 0; c < col; c++) {
+        rand.nextBytes(cq);
+        rand.nextBytes(value);
+        m.put(CF, new Text(cq), new Value(value));
+      }
+      bw.addMutation(m);
+      if (r % 100 == 0) {
+        bw.flush();
+      }
+    }
+    bw.close();
+  }
+
+  private Map<String, Boolean> getWals(Connector c, ZooKeeper zoo) throws 
Exception {
+    Map<String, Boolean> result = new HashMap<>();
+    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+    root.setRange(CurrentLogsSection.getRange());
+    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+    meta.setRange(root.getRange());
+    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), 
meta.iterator());
+    while (both.hasNext()) {
+      Entry<Key,Value> entry = both.next();
+      Text path = new Text();
+      CurrentLogsSection.getPath(entry.getKey(), path);
+      result.put(path.toString(), entry.getValue().get().length == 0);
+    }
+    String zpath = ZooUtil.getRoot(c.getInstance()) + 
RootTable.ZROOT_TABLET_CURRENT_LOGS;
+    List<String> children = zoo.getChildren(zpath, null);
+    for (String child : children) {
+      byte[] data = zoo.getData(zpath + "/" + child, null, null);
+      result.put(new String(data), true);
+    }
+    return result;
+  }
+
+  private Map<KeyExtent, List<String>> getRecoveryMarkers(Connector c) throws 
Exception {
+    Map<KeyExtent, List<String>> result = new HashMap<>();
+    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+    root.setRange(TabletsSection.getRange());
+    root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+    TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
+
+    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+    meta.setRange(TabletsSection.getRange());
+    meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+    TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
+
+    List<String> logs = new ArrayList<>();
+    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), 
meta.iterator());
+    while (both.hasNext()) {
+      Entry<Key,Value> entry = both.next();
+      Key key = entry.getKey();
+      if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
+        logs.add(key.getColumnQualifier().toString());
+      }
+      if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && 
!logs.isEmpty()) {
+        KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
+        result.put(extent, logs);
+        logs = new ArrayList<String>();
+      }
+    }
+    return result;
+  }
+
+}

Reply via email to