Modified: 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1344396&r1=1344395&r2=1344396&view=diff
==============================================================================
--- 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
 (original)
+++ 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
 Wed May 30 18:34:43 2012
@@ -21,23 +21,39 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.RecoveryStatus;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.server.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 
 /**
  * 
@@ -48,30 +64,37 @@ public class LogSorter {
   FileSystem fs;
   AccumuloConfiguration conf;
   
+  private Map<String,Work> currentWork = new HashMap<String,Work>();
+
   class Work implements Runnable {
     final String name;
     FSDataInputStream input;
     final String destPath;
     long bytesCopied = -1;
+    long sortStart = 0;
+    long sortStop = -1;
+    private final LogSortNotifier cback;
     
     synchronized long getBytesCopied() throws IOException {
       return input == null ? bytesCopied : input.getPos();
     }
     
-    Work(String name, FSDataInputStream input, String destPath) {
+    Work(String name, FSDataInputStream input, String destPath, 
LogSortNotifier cback) {
       this.name = name;
       this.input = input;
       this.destPath = destPath;
+      this.cback = cback;
     }
     synchronized boolean finished() {
       return input == null;
     }
     public void run() {
+      sortStart = System.currentTimeMillis();
       String formerThreadName = Thread.currentThread().getName();
+      int part = 0;
       try {
         final long bufferSize = 
conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
         Thread.currentThread().setName("Sorting " + name + " for recovery");
-        int part = 0;
         while (true) {
           final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new 
ArrayList<Pair<LogFileKey, LogFileValue>>();
           try {
@@ -99,6 +122,11 @@ public class LogSorter {
           log.error("Error creating failed flag file " + name, e);
         }
         log.error(t, t);
+        try {
+          cback.notice(name, getBytesCopied(), part, getSortTime(), 
t.toString());
+        } catch (Exception ex) {
+          log.error("Strange error notifying the master of a logSort problem 
for file " + name);
+        }
       } finally {
         Thread.currentThread().setName(formerThreadName);
         try {
@@ -106,6 +134,15 @@ public class LogSorter {
         } catch (IOException e) {
           log.error("Error during cleanup sort/copy " + name, e);
         }
+        sortStop = System.currentTimeMillis();
+        synchronized (currentWork) {
+          currentWork.remove(name);
+        }
+        try {
+          cback.notice(name, getBytesCopied(), part, getSortTime(), "");
+        } catch (Exception ex) {
+          log.error("Strange error reporting successful log sort " + name, ex);
+        }
       }
     }
     
@@ -132,56 +169,151 @@ public class LogSorter {
       input.close();
       input = null;
     }
+    
+    public synchronized long getSortTime() {
+      if (sortStart > 0) {
+        if (sortStop > 0)
+          return sortStop - sortStart;
+        return System.currentTimeMillis() - sortStart;
+      }
+      return 0;
+    }
   };
   
-  final ExecutorService threadPool;
-  Map<String,Work> sorts = new ConcurrentHashMap<String,Work>();
+  final ThreadPoolExecutor threadPool;
+  private Instance instance;
   
-  public LogSorter(FileSystem fs, AccumuloConfiguration conf) {
+  public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration 
conf) {
+    this.instance = instance;
     this.fs = fs;
     this.conf = conf;
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
-    this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
+    this.threadPool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
   }
   
-  public double sort(String src, String dest) throws IOException {
-    synchronized (this) {
-      Work work = sorts.get(src);
-      if (work == null) {
-        work = startSort(src, dest);
-        sorts.put(src, work);
-      } else {
-        if (work.finished())
-          sorts.remove(src);
+  public void startWatchingForRecoveryLogs(final String serverName) throws 
KeeperException, InterruptedException {
+    final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
+    final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    zoo.mkdirs(path);
+    List<String> children = zoo.getChildren(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeChildrenChanged:
+            if (event.getPath().equals(path))
+              try {
+                attemptRecoveries(zoo, serverName, path, 
zoo.getChildren(path));
+              } catch (KeeperException e) {
+                log.error("Unable to get recovery information", e);
+              } catch (InterruptedException e) {
+                log.info("Interrupted getting recovery information", e);
+              }
+            else
+              log.info("Unexpected path for NodeChildrenChanged event " + 
event.getPath());
+            break;
+          case NodeCreated:
+          case NodeDataChanged:
+          case NodeDeleted:
+          case None:
+            log.info("Got unexpected zookeeper event: " + event.getType() + " 
for " + path);
+            break;
+          
+        }
       }
-      long bytesCopied = work.getBytesCopied();
-      long estimate = conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
-      return bytesCopied / ((double) estimate);
-    }
+    });
+    attemptRecoveries(zoo, serverName, path, children);
+    Random r = new Random();
+    // Add a little jitter to avoid all the tservers slamming zookeeper at once
+    SimpleTimer.getInstance().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
+        } catch (KeeperException e) {
+          log.error("Unable to get recovery information", e);
+        } catch (InterruptedException e) {
+          log.info("Interrupted getting recovery information", e);
+        }        
+      }
+    }, r.nextInt(1000), 60 * 1000);
   }
   
-  private Work startSort(String src, String dest) throws IOException {
+  private void attemptRecoveries(final ZooReaderWriter zoo, final String 
serverName, String path, List<String> children) {
+    if (children.size() == 0)
+      return;
+    log.info("Zookeeper references " + children.size() + " recoveries, 
attempting locks");
+    Random random = new Random();
+    Collections.shuffle(children, random);
+    try {
+      for (String child : children) {
+        final String childPath = path + "/" + child;
+        log.debug("Attempting to lock " + child);
+        ZooLock lock = new ZooLock(childPath);
+        if (lock.tryLock(new LockWatcher() {
+          @Override
+          public void lostLock(LockLossReason reason) {
+            log.info("Ignoring lost lock event, reason " + reason);
+          }
+        }, serverName.getBytes())) {
+          // Great... we got the lock, but maybe we're too busy
+          if (threadPool.getQueue().size() > 1) {
+            lock.unlock();
+            continue;
+          }
+          byte[] contents = zoo.getData(childPath, null);
+          String destination = Constants.getRecoveryDir(conf) + "/" + child;
+          startSort(new String(contents), destination, new LogSortNotifier() {
+            @Override
+            public void notice(String name, long bytes, int parts, long 
milliseconds, String error) {
+              log.info("Finished log sort " + name + " " + bytes + " bytes " + 
parts + " parts in " + milliseconds + "ms");
+              try {
+                zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
+              } catch (Exception e) {
+                log.error("Error received when trying to delete recovery entry 
in zookeeper " + childPath);
+              }
+            }
+          });
+        }
+      }
+    } catch (Throwable t) {
+      log.error("Unexpected error", t);
+    }
+  }
+
+  public interface LogSortNotifier {
+    public void notice(String name, long bytes, int parts, long milliseconds, 
String error);
+  }
+
+  private void startSort(String src, String dest, LogSortNotifier cback) 
throws IOException {
     log.info("Copying " + src + " to " + dest);
+    fs.delete(new Path(dest), true);
     Path srcPath = new Path(src);
-    while (true) {
-      try {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem dfs = (DistributedFileSystem) fs;
-          dfs.recoverLease(srcPath);
-          log.debug("recovered lease on " + srcPath);
-        } else {
-          fs.append(srcPath).close();
-          log.debug("successfully appended to " + srcPath);
-        }
-        break;
-      } catch (IOException e) {
-        log.debug("error recovering lease on " + srcPath, e);
-        UtilWaitThread.sleep(1000);
-        log.debug("retrying lease recovery on " + srcPath);
-      }
-    }
-    Work work = new Work(srcPath.getName(), fs.open(srcPath), dest);
-    threadPool.execute(work);
-    return work;
+    synchronized (currentWork) {
+      Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
+      if (!currentWork.containsKey(srcPath.getName())) {
+        threadPool.execute(work);
+        currentWork.put(srcPath.getName(), work);
+      }
+    }
+  }
+  
+  public List<RecoveryStatus> getLogSorts() {
+    List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
+    synchronized (currentWork) {
+      for (Entry<String,Work> entries : currentWork.entrySet()) {
+        RecoveryStatus status = new RecoveryStatus();
+        status.name = entries.getKey();
+        try {
+          status.progress = entries.getValue().getBytesCopied() / (0.0 + 
conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
+        } catch (IOException ex) {
+          log.warn("Error getting bytes read");
+        }
+        status.runtime = (int) entries.getValue().getSortTime();
+        result.add(status);
+      }
+      return result;
+    }
   }
 }

Modified: 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1344396&r1=1344395&r2=1344396&view=diff
==============================================================================
--- 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
 (original)
+++ 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
 Wed May 30 18:34:43 2012
@@ -59,7 +59,7 @@ public class TabletServerLogger {
   private final TabletServer tserver;
   
   // The current log set: always updated to a new set with every change of 
loggers
-  private final List<IRemoteLogger> loggers = new ArrayList<IRemoteLogger>();
+  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
   
   // The current generation of logSet.
   // Because multiple threads can be using a log set at one time, a log
@@ -132,7 +132,7 @@ public class TabletServerLogger {
     this.maxSize = maxSize;
   }
   
-  private int initializeLoggers(final List<IRemoteLogger> copy) throws 
IOException {
+  private int initializeLoggers(final List<DfsLogger> copy) throws IOException 
{
     final int[] result = {-1};
     testLockAndRun(logSetLock, new TestCallWithWriteLock() {
       boolean test() {
@@ -163,8 +163,8 @@ public class TabletServerLogger {
   public void getLoggers(Set<String> loggersOut) {
     logSetLock.readLock().lock();
     try {
-      for (IRemoteLogger logger : loggers) {
-        loggersOut.add(logger.getLogger());
+      for (DfsLogger logger : loggers) {
+        loggersOut.add(logger.toString());
       }
     } finally {
       logSetLock.readLock().unlock();
@@ -205,7 +205,7 @@ public class TabletServerLogger {
       throw new IllegalStateException("close should be called with write lock 
held!");
     }
     try {
-      for (IRemoteLogger logger : loggers) {
+      for (DfsLogger logger : loggers) {
         try {
           logger.close();
         } catch (Throwable ex) {
@@ -220,7 +220,7 @@ public class TabletServerLogger {
   }
   
   interface Writer {
-    LoggerOperation write(IRemoteLogger logger, int seq) throws Exception;
+    LoggerOperation write(DfsLogger logger, int seq) throws Exception;
   }
   
   private int write(CommitSession commitSession, boolean mincFinish, Writer 
writer) throws IOException {
@@ -239,7 +239,7 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        ArrayList<IRemoteLogger> copy = new ArrayList<IRemoteLogger>();
+        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
         currentLogSet = initializeLoggers(copy);
         
         // add the logger to the log set for the memory in the tablet,
@@ -268,7 +268,7 @@ public class TabletServerLogger {
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  
Onos!!!11!eleven");
           ArrayList<LoggerOperation> queuedOperations = new 
ArrayList<LoggerOperation>(copy.size());
-          for (IRemoteLogger wal : copy) {
+          for (DfsLogger wal : copy) {
             LoggerOperation lop = writer.write(wal, seq);
             if (lop != null)
               queuedOperations.add(lop);
@@ -330,7 +330,7 @@ public class TabletServerLogger {
       return -1;
     return write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(IRemoteLogger logger, int ignored) throws 
Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
         logger.defineTablet(commitSession.getWALogSeq(), 
commitSession.getLogId(), commitSession.getExtent());
         return null;
       }
@@ -342,7 +342,7 @@ public class TabletServerLogger {
       return -1;
     int seq = write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(IRemoteLogger logger, int ignored) throws 
Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
         return logger.log(tabletSeq, commitSession.getLogId(), m);
       }
     });
@@ -362,7 +362,7 @@ public class TabletServerLogger {
     
     int seq = write(loggables.keySet(), false, new Writer() {
       @Override
-      public LoggerOperation write(IRemoteLogger logger, int ignored) throws 
Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
         List<TabletMutations> copy = new 
ArrayList<TabletMutations>(loggables.size());
         for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) 
{
           CommitSession cs = entry.getKey();
@@ -393,7 +393,7 @@ public class TabletServerLogger {
     
     int seq = write(commitSession, true, new Writer() {
       @Override
-      public LoggerOperation write(IRemoteLogger logger, int ignored) throws 
Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
         logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), 
fullyQualifiedFileName);
         return null;
       }
@@ -409,7 +409,7 @@ public class TabletServerLogger {
       return -1;
     write(commitSession, false, new Writer() {
       @Override
-      public LoggerOperation write(IRemoteLogger logger, int ignored) throws 
Exception {
+      public LoggerOperation write(DfsLogger logger, int ignored) throws 
Exception {
         logger.minorCompactionStarted(seq, commitSession.getLogId(), 
fullyQualifiedFileName);
         return null;
       }

Modified: 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java?rev=1344396&r1=1344395&r2=1344396&view=diff
==============================================================================
--- 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
 (original)
+++ 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/GetMasterStats.java
 Wed May 30 18:34:43 2012
@@ -108,16 +108,12 @@ public class GetMasterStats {
             out(4, "Queued for Minor Compaction %d", info.minor == null ? 0 : 
info.minor.queued);
           }
         }
-      }
-    }
-    if (stats.recovery != null && stats.recovery.size() > 0) {
-      out(0, "Recovery");
-      for (RecoveryStatus r : stats.recovery) {
-        out(1, "Log Server %s", r.host);
-        out(1, "Log Name %s", r.name);
-        out(1, "Map Progress: %.2f%%", r.mapProgress * 100);
-        out(1, "Reduce Progress: %.2f%%", r.reduceProgress * 100);
-        out(1, "Time running: %s", r.runtime / 1000.);
+        out(2, "Recoveries %d", server.logSorts.size());
+        for (RecoveryStatus sort : server.logSorts) {
+          out(3, "File %s", sort.name);
+          out(3, "Progress %.2f%%", sort.progress * 100);
+          out(3, "Time running %s", sort.runtime / 1000.);
+        }
       }
     }
   }

Modified: 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1344396&r1=1344395&r2=1344396&view=diff
==============================================================================
--- 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
 (original)
+++ 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
 Wed May 30 18:34:43 2012
@@ -188,17 +188,6 @@ public class NullTserver {
     /*
      * (non-Javadoc)
      * 
-     * @see 
org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#sortLog(org.apache.accumulo.cloudtrace.thrift.TInfo,
-     * org.apache.accumulo.core.security.thrift.AuthInfo, java.lang.String)
-     */
-    @Override
-    public double sortLog(TInfo tinfo, AuthInfo credentials, String lock, 
String path) throws ThriftSecurityException, TException {
-      return 0;
-    }
-    
-    /*
-     * (non-Javadoc)
-     * 
      * @see 
org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.cloudtrace.thrift.TInfo,
      * org.apache.accumulo.core.security.thrift.AuthInfo, java.util.List)
      */

Modified: 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java?rev=1344396&r1=1344395&r2=1344396&view=diff
==============================================================================
--- 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
 (original)
+++ 
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java
 Wed May 30 18:34:43 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.util.Progressab
 // If FileSystem was an interface, we could use a Proxy, but it's not, so we 
have to override everything manually
 
 public class TraceFileSystem extends FileSystem {
+
   @Override
   public void setConf(Configuration conf) {
     Span span = Trace.start("setConf");
@@ -667,6 +668,10 @@ public class TraceFileSystem extends Fil
     this.impl = impl;
   }
   
+  public FileSystem getImplementation() {
+    return impl;
+  }
+
   @Override
   public URI getUri() {
     Span span = Trace.start("getUri");


Reply via email to