Author: ecn
Date: Thu Jan 24 22:10:51 2013
New Revision: 1438225
URL: http://svn.apache.org/viewvc?rev=1438225&view=rev
Log:
ACCUMULO-985 cleanup batch scanners and batch writers when they are removed
from the LRU
Modified:
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
Modified:
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java?rev=1438225&r1=1438224&r2=1438225&view=diff
==============================================================================
---
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
(original)
+++
accumulo/trunk/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
Thu Jan 24 22:10:51 2013
@@ -83,20 +83,48 @@ import org.apache.accumulo.proxy.thrift.
import org.apache.accumulo.proxy.thrift.UserPass;
import org.apache.accumulo.proxy.thrift.WriterOptions;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
public class ProxyServer implements AccumuloProxy.Iface {
+ public static final Logger logger = Logger.getLogger(ProxyServer.class);
protected Instance instance;
- protected class ScannerPlusIterator {
+ static protected class ScannerPlusIterator {
public ScannerBase scanner;
public Iterator<Map.Entry<Key,Value>> iterator;
}
+ static class CloseWriter implements RemovalListener<UUID,BatchWriter> {
+ @Override
+ public void onRemoval(RemovalNotification<UUID,BatchWriter> notification) {
+ try {
+ notification.getValue().close();
+ } catch (MutationsRejectedException e) {
+ logger.warn(e, e);
+ }
+ }
+ public CloseWriter() {}
+ }
+
+ static class CloseScanner implements
RemovalListener<UUID,ScannerPlusIterator> {
+ @Override
+ public void onRemoval(RemovalNotification<UUID,ScannerPlusIterator>
notification) {
+ final ScannerBase base = notification.getValue().scanner;
+ if (base instanceof BatchScanner) {
+ final BatchScanner scanner = (BatchScanner)base;
+ scanner.close();
+ }
+ }
+ public CloseScanner() {}
+ }
+
protected Cache<UUID,ScannerPlusIterator> scannerCache;
protected Cache<UUID,BatchWriter> writerCache;
@@ -108,9 +136,9 @@ public class ProxyServer implements Accu
instance = new
ZooKeeperInstance(props.getProperty("org.apache.accumulo.proxy.ProxyServer.instancename"),
props.getProperty("org.apache.accumulo.proxy.ProxyServer.zookeepers"));
- scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10,
TimeUnit.MINUTES).maximumSize(1000).build();
+ scannerCache = CacheBuilder.newBuilder().expireAfterAccess(10,
TimeUnit.MINUTES).maximumSize(1000).removalListener(new CloseScanner()).build();
- writerCache = CacheBuilder.newBuilder().expireAfterAccess(10,
TimeUnit.MINUTES).maximumSize(1000).build();
+ writerCache = CacheBuilder.newBuilder().expireAfterAccess(10,
TimeUnit.MINUTES).maximumSize(1000).removalListener(new CloseWriter()).build();
}
protected Connector getConnector(UserPass userpass) throws Exception {