Author: ecn
Date: Thu Jan 24 22:10:51 2013
New Revision: 1438225

URL: http://svn.apache.org/viewvc?rev=1438225&view=rev
Log:
ACCUMULO-985 cleanup batch scanners and batch writers when they are removed 
from the LRU

Modified:
    
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java

Modified: 
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java?rev=1438225&r1=1438224&r2=1438225&view=diff
==============================================================================
--- 
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java 
(original)
+++ 
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java 
Thu Jan 24 22:10:51 2013
@@ -83,20 +83,48 @@ import org.apache.accumulo.proxy.thrift.
 import org.apache.accumulo.proxy.thrift.UserPass;
 import org.apache.accumulo.proxy.thrift.WriterOptions;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 
 public class ProxyServer implements AccumuloProxy.Iface {
   
+  public static final Logger logger = Logger.getLogger(ProxyServer.class);
   protected Instance instance;
   
-  protected class ScannerPlusIterator {
+  static protected class ScannerPlusIterator {
     public ScannerBase scanner;
     public Iterator<Map.Entry<Key,Value>> iterator;
   }
   
+  static class CloseWriter implements RemovalListener<UUID,BatchWriter> {
+    @Override
+    public void onRemoval(RemovalNotification<UUID,BatchWriter> notification) {
+      try {
+        notification.getValue().close();
+      } catch (MutationsRejectedException e) {
+        logger.warn(e, e);
+      }
+    }
+    public CloseWriter() {}
+  }
+  
+  static class CloseScanner implements 
RemovalListener<UUID,ScannerPlusIterator> {
+    @Override
+    public void onRemoval(RemovalNotification<UUID,ScannerPlusIterator> 
notification) {
+      final ScannerBase base = notification.getValue().scanner;
+      if (base instanceof BatchScanner) {
+        final BatchScanner scanner = (BatchScanner)base;
+        scanner.close();
+      }
+    }
+    public CloseScanner() {}
+  }
+
   protected Cache<UUID,ScannerPlusIterator> scannerCache;
   protected Cache<UUID,BatchWriter> writerCache;
   
@@ -108,9 +136,9 @@ public class ProxyServer implements Accu
       instance = new 
ZooKeeperInstance(props.getProperty("org.apache.accumulo.proxy.ProxyServer.instancename"),
           
props.getProperty("org.apache.accumulo.proxy.ProxyServer.zookeepers"));
     
-    scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, 
TimeUnit.MINUTES).maximumSize(1000).build();
+    scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10, 
TimeUnit.MINUTES).maximumSize(1000).removalListener(new CloseScanner()).build();
     
-    writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, 
TimeUnit.MINUTES).maximumSize(1000).build();
+    writerCache = CacheBuilder.newBuilder().expireAfterAccess(10, 
TimeUnit.MINUTES).maximumSize(1000).removalListener(new CloseWriter()).build();
   }
   
   protected Connector getConnector(UserPass userpass) throws Exception {


Reply via email to