Changing the error handling of the timertasks so that will keep operating when there are runtime exceptions that are not handled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/dda225e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/dda225e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/dda225e9 Branch: refs/heads/v2_command Commit: dda225e972341410e2afbb39a17107ef495231c4 Parents: 55cc3f6 Author: Aaron McCurry <amccu...@gmail.com> Authored: Mon Dec 7 14:32:43 2015 -0500 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Mon Dec 7 14:32:43 2015 -0500 ---------------------------------------------------------------------- .../apache/blur/command/BaseCommandManager.java | 30 +++++++++++--------- .../manager/indexserver/ShardStateManager.java | 12 ++++++++ .../manager/writer/BlurIndexSimpleWriter.java | 14 ++++++++- .../blur/manager/writer/IndexImporter.java | 3 ++ .../blur/thrift/BlurControllerServer.java | 4 +-- .../org/apache/blur/kvs/HdfsKeyValueStore.java | 4 +-- .../blur/store/blockcache_v2/MeterWrapper.java | 11 ++++++- .../apache/blur/store/hdfs/HdfsDirectory.java | 19 ++++++++----- .../java/org/apache/blur/thrift/ClientPool.java | 8 ++++-- .../apache/blur/memory/MemoryLeakDetector.java | 6 +++- 10 files changed, 82 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java index 452a04d..edee060 100644 --- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java +++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java @@ -133,20 +133,24 @@ public abstract class BaseCommandManager implements Closeable { return new TimerTask() { @Override public void run() { - Set<Entry<Long, ResponseFuture<?>>> entrySet = runningMap.entrySet(); - for (Entry<Long, ResponseFuture<?>> e : entrySet) { - Long instanceExecutionId = e.getKey(); - ResponseFuture<?> responseFuture = e.getValue(); - if (!responseFuture.isRunning() && responseFuture.hasExpired()) { - Command<?> commandExecuting = responseFuture.getCommandExecuting(); - String commandExecutionId = null; - if (commandExecuting != null) { - commandExecutionId = commandExecuting.getCommandExecutionId(); + try { + Set<Entry<Long, ResponseFuture<?>>> entrySet = runningMap.entrySet(); + for (Entry<Long, ResponseFuture<?>> e : entrySet) { + Long instanceExecutionId = e.getKey(); + ResponseFuture<?> responseFuture = e.getValue(); + if (!responseFuture.isRunning() && responseFuture.hasExpired()) { + Command<?> commandExecuting = responseFuture.getCommandExecuting(); + String commandExecutionId = null; + if (commandExecuting != null) { + commandExecutionId = commandExecuting.getCommandExecutionId(); + } + LOG.info("Removing old execution instance id [{0}] with command execution id of [{1}]", + instanceExecutionId, commandExecutionId); + runningMap.remove(instanceExecutionId); } - LOG.info("Removing old execution instance id [{0}] with command execution id of [{1}]", - instanceExecutionId, commandExecutionId); - runningMap.remove(instanceExecutionId); } + } catch (Throwable t) { + LOG.error("Unknown error.", t); } } }; @@ -187,7 +191,7 @@ public abstract class BaseCommandManager implements Closeable { Path path = new Path(_commandPath); FileSystem fileSystem = path.getFileSystem(_configuration); int changeCount = 0; - if(fileSystem.exists(path)) { + if (fileSystem.exists(path)) { FileStatus[] listStatus = fileSystem.listStatus(path); for (FileStatus fileStatus : listStatus) { BigInteger contentsCheck = checkContents(fileStatus, fileSystem); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java b/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java index b507736..3505883 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java +++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/ShardStateManager.java @@ -13,6 +13,8 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; import org.apache.blur.thrift.generated.ShardState; /** @@ -39,6 +41,8 @@ import org.apache.blur.thrift.generated.ShardState; */ public class ShardStateManager implements Closeable { + private static final Log LOG = LogFactory.getLog(ShardStateManager.class); + private static final long _5_SECONDS = TimeUnit.SECONDS.toMillis(5); private static final long _60_SECONDS = TimeUnit.SECONDS.toMillis(60); private final Map<Key, Value> stateMap = new ConcurrentHashMap<Key, Value>(); @@ -49,6 +53,14 @@ public class ShardStateManager implements Closeable { timer.schedule(new TimerTask() { @Override public void run() { + try { + cleanup(); + } catch (Throwable t) { + LOG.error("Unkown error whiel trying to cleanup shard state manager.", t); + } + } + + private void cleanup() { Collection<Key> toBeDeleted = null; for (Entry<Key, Value> e : stateMap.entrySet()) { if (shouldBeRemoved(e)) { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java index 1570362..20a7440 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java @@ -226,6 +226,14 @@ public class BlurIndexSimpleWriter extends BlurIndex { _watchForIdleBulkWriters = new TimerTask() { @Override public void run() { + try { + watchForIdleBulkWriters(); + } catch (Throwable t) { + LOG.error("Unknown error.", t); + } + } + + private void watchForIdleBulkWriters() { for (BulkEntry bulkEntry : _bulkWriters.values()) { bulkEntry._lock.lock(); try { @@ -248,7 +256,11 @@ public class BlurIndexSimpleWriter extends BlurIndex { _watchForIdleWriter = new TimerTask() { @Override public void run() { - closeWriter(); + try { + closeWriter(); + } catch (Throwable t) { + LOG.error("Unknown error while trying to close idle writer.", t); + } } }; _indexWriterTimer.schedule(_watchForIdleWriter, _maxWriterIdle, _maxWriterIdle); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java index c69c71b..9786ea0 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java @@ -259,9 +259,12 @@ public class IndexImporter extends TimerTask implements Closeable { } catch (IOException e) { LOG.error("Unknown error while trying to refresh imports on [{1}/{2}].", e, _shard, _table); } + } catch (Throwable t) { + LOG.error("Unknown error while tyring to run index importer.", t); } finally { _globalLock.unlock(); } + } private void touch(FileSystem fileSystem, Path path) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java index b24bb03..8ee32af 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java @@ -236,7 +236,7 @@ public class BlurControllerServer extends TableAdmin implements Iface { public void run() { try { tableContextWarmup(); - } catch (Exception e) { + } catch (Throwable e) { LOG.error("Unknown error while trying to preconnect to shard servers.", e); } } @@ -267,7 +267,7 @@ public class BlurControllerServer extends TableAdmin implements Iface { public void run() { try { preconnectClients(); - } catch (Exception e) { + } catch (Throwable e) { LOG.error("Unknown error while trying to preconnect to shard servers.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java ---------------------------------------------------------------------- diff --git a/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java index 227d620..ace65cc 100644 --- a/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java +++ b/blur-kvs/src/main/java/org/apache/blur/kvs/HdfsKeyValueStore.java @@ -220,7 +220,7 @@ public class HdfsKeyValueStore implements Store { public void run() { try { cleanupOldFiles(); - } catch (IOException e) { + } catch (Throwable e) { LOG.error("Unknown error while trying to clean up old files.", e); } } @@ -233,7 +233,7 @@ public class HdfsKeyValueStore implements Store { public void run() { try { closeLogFileIfIdle(); - } catch (IOException e) { + } catch (Throwable e) { LOG.error("Unknown error while trying to close output file.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java index d608e14..2e8d245 100644 --- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java +++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/MeterWrapper.java @@ -29,10 +29,15 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; + import com.yammer.metrics.core.Meter; public abstract class MeterWrapper implements Closeable { + private static final Log LOG = LogFactory.getLog(MeterWrapper.class); + public static final MeterWrapper NOTHING = new MeterWrapper() { @Override public void mark() { @@ -95,7 +100,11 @@ public abstract class MeterWrapper implements Closeable { return new TimerTask() { @Override public void run() { - updateMetrics(); + try { + updateMetrics(); + } catch (Throwable t) { + LOG.error("Unknown error.", t); + } } }; } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java index 6bcba98..2e457fd 100644 --- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java +++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java @@ -353,13 +353,17 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin return new TimerTask() { @Override public void run() { - while (true) { - Closeable closeable = CLOSING_QUEUE.poll(); - if (closeable == null) { - return; + try { + while (true) { + Closeable closeable = CLOSING_QUEUE.poll(); + if (closeable == null) { + return; + } + LOG.info("Closing [{0}] [{1}]", System.identityHashCode(closeable), closeable); + org.apache.hadoop.io.IOUtils.cleanup(LOG, closeable); } - LOG.info("Closing [{0}] [{1}]", System.identityHashCode(closeable), closeable); - org.apache.hadoop.io.IOUtils.cleanup(LOG, closeable); + } catch (Throwable t) { + LOG.error("Unknown error.", t); } } }; @@ -478,7 +482,8 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin } long fileLength = fileLength(name); Path path = getPath(name); - FSInputFileHandle fsInputFileHandle = new FSInputFileHandle(_fileSystem, path, fileLength, name, _resourceTracking, _asyncClosing && _useCache); + FSInputFileHandle fsInputFileHandle = new FSInputFileHandle(_fileSystem, path, fileLength, name, _resourceTracking, + _asyncClosing && _useCache); HdfsIndexInput input = new HdfsIndexInput(this, fsInputFileHandle, fileLength, _metricsGroup, name, _sequentialReadControl.clone()); return input; http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java index 9cafd96..537f2fd 100644 --- a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java +++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java @@ -100,8 +100,12 @@ public class ClientPool { master.schedule(new TimerTask() { @Override public void run() { - for (Entry<Connection, BlockingQueue<Client>> e : _connMap.entrySet()) { - testConnections(e.getKey(), e.getValue()); + try { + for (Entry<Connection, BlockingQueue<Client>> e : _connMap.entrySet()) { + testConnections(e.getKey(), e.getValue()); + } + } catch (Throwable t) { + LOG.error("Unknown error while trying to clean up connections.", t); } } }, getClientPoolCleanFrequency(), getClientPoolCleanFrequency()); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dda225e9/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java ---------------------------------------------------------------------- diff --git a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java index 391a76d..28ebce4 100644 --- a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java +++ b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java @@ -68,7 +68,11 @@ public class MemoryLeakDetector { _timer.schedule(new TimerTask() { @Override public void run() { - dump(); + try { + dump(); + } catch (Throwable t) { + LOG.error("Unknown error.", t); + } } }, TimeUnit.SECONDS.toMillis(10), TimeUnit.SECONDS.toMillis(10)); }