Fixing an infinite loop problem when a table is deleted before the writer is closed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e4a02c08 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e4a02c08 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e4a02c08 Branch: refs/heads/v2_command Commit: e4a02c089c0a46bdf597abffeb0f68a831be3660 Parents: dda225e Author: Aaron McCurry <amccu...@gmail.com> Authored: Mon Dec 7 14:36:39 2015 -0500 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Mon Dec 7 14:36:39 2015 -0500 ---------------------------------------------------------------------- .../apache/blur/manager/writer/IndexImporter.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e4a02c08/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java index 9786ea0..42171e8 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java @@ -81,6 +81,7 @@ public class IndexImporter extends TimerTask implements Closeable { private final Timer _inindexImporterTimer; private final ThriftCache _thriftCache; private final HdfsDirectory _directory; + private final int MAX_ATTEMPTS = 10; private long _lastCleanup; private Runnable _testError; @@ -201,9 +202,13 @@ public class IndexImporter extends TimerTask implements Closeable { Configuration configuration = _shardContext.getTableContext().getConfiguration(); try { FileSystem fileSystem = path.getFileSystem(configuration); - SortedSet<FileStatus> listStatus; - while (true) { + SortedSet<FileStatus> listStatus = null; + for (int i = 0; i < MAX_ATTEMPTS; i++) { try { + if (!fileSystem.exists(path)) { + LOG.warn("Path [{0}] no longer exists, exiting.", path); + return; + } listStatus = sort(fileSystem.listStatus(path, new PathFilter() { @Override public boolean accept(Path path) { @@ -218,11 +223,15 @@ public class IndexImporter extends TimerTask implements Closeable { LOG.warn("File not found error, retrying."); } try { - Thread.sleep(100); + Thread.sleep(100 * (i + 1)); } catch (InterruptedException e) { return; } } + if (listStatus == null) { + LOG.warn("Could not get listing of path [{0}], exiting.", path); + return; + } for (FileStatus fileStatus : listStatus) { Path file = fileStatus.getPath(); if (fileStatus.isDir() && file.getName().endsWith(COMMIT)) {