Changing the isuse directory cleanup system in the index importer to make use of the hdfs directory symlink cache.
Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9028b285 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9028b285 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9028b285 Branch: refs/heads/master Commit: 9028b285fb8388efc647d1f2034d6a12b3648e5c Parents: 279b106 Author: Aaron McCurry <amccu...@gmail.com> Authored: Wed Aug 12 11:29:41 2015 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Wed Aug 12 11:29:41 2015 -0400 ---------------------------------------------------------------------- .../manager/writer/BlurIndexSimpleWriter.java | 2 +- .../blur/manager/writer/IndexImporter.java | 19 +++++++++++++++++-- .../blur/manager/writer/IndexImporterTest.java | 2 +- .../apache/blur/store/hdfs/HdfsDirectory.java | 4 ++-- 4 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9028b285/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java index 3c12c9f..71da14a 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java @@ -290,7 +290,7 @@ public class BlurIndexSimpleWriter extends BlurIndex { _writer.notify(); } _indexImporter = new IndexImporter(_indexImporterTimer, BlurIndexSimpleWriter.this, _shardContext, - TimeUnit.SECONDS, 10, _thriftCache); + TimeUnit.SECONDS, 10, _thriftCache, _directory); } catch (IOException e) { LOG.error("Unknown error on index writer open.", e); } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9028b285/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java index d45876e..ad54366 100644 --- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java @@ -39,6 +39,7 @@ import org.apache.blur.manager.BlurPartitioner; import org.apache.blur.server.ShardContext; import org.apache.blur.server.TableContext; import org.apache.blur.server.cache.ThriftCache; +import org.apache.blur.store.hdfs.DirectoryDecorator; import org.apache.blur.store.hdfs.HdfsDirectory; import org.apache.blur.utils.BlurConstants; import org.apache.blur.utils.ShardUtil; @@ -78,15 +79,17 @@ public class IndexImporter extends TimerTask implements Closeable { private final long _cleanupDelay; private final Timer _inindexImporterTimer; private final ThriftCache _thriftCache; + private final HdfsDirectory _directory; private long _lastCleanup; private Runnable _testError; public IndexImporter(Timer indexImporterTimer, BlurIndex blurIndex, ShardContext shardContext, TimeUnit refreshUnit, - long refreshAmount, ThriftCache thriftCache) { + long refreshAmount, ThriftCache thriftCache, Directory dir) throws IOException { _thriftCache = thriftCache; _blurIndex = blurIndex; _shardContext = shardContext; + _directory = getHdfsDirectory(dir); long period = refreshUnit.toMillis(refreshAmount); indexImporterTimer.schedule(this, period, period); @@ -96,6 +99,17 @@ public class IndexImporter extends TimerTask implements Closeable { _cleanupDelay = TimeUnit.MINUTES.toMillis(10); } + private HdfsDirectory getHdfsDirectory(Directory dir) throws IOException { + if (dir instanceof HdfsDirectory) { + return (HdfsDirectory) dir; + } else if (dir instanceof DirectoryDecorator) { + DirectoryDecorator decorator = (DirectoryDecorator) dir; + return getHdfsDirectory(decorator.getOriginalDirectory()); + } else { + throw new IOException("Directory [" + dir + "] is not HdfsDirectory or DirectoryDecorator"); + } + } + @Override public void close() throws IOException { cancel(); @@ -362,7 +376,8 @@ public class IndexImporter extends TimerTask implements Closeable { }); for (FileStatus status : listStatus) { - Path realPath = HdfsDirectory.readRealPathDataFromSymlinkPath(fileSystem, status.getPath()); + String realFileName = HdfsDirectory.getRealFileName(status.getPath().getName()); + Path realPath = _directory.getRealFilePathFromSymlink(realFileName); Path inuseDir = inuseFileToDir.get(realPath); inuseDirs.remove(inuseDir); // if the inuse dir has an inprogress file then remove it because there http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9028b285/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java ---------------------------------------------------------------------- diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java index 11c9d77..cfac626 100644 --- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java +++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java @@ -122,7 +122,7 @@ public class IndexImporterTest { BufferStore.initNewBuffer(128, 128 * 128); _indexImporter = new IndexImporter(_timer, getBlurIndex(shardContext, _mainDirectory), shardContext, - TimeUnit.MINUTES, 10, null); + TimeUnit.MINUTES, 10, null, _mainDirectory); } private BlurIndex getBlurIndex(ShardContext shardContext, final Directory mainDirectory) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9028b285/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java ---------------------------------------------------------------------- diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java index 116cd72..cef7c11 100644 --- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java +++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java @@ -351,7 +351,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin }; } - protected String getRealFileName(String name) { + public static String getRealFileName(String name) { if (name.endsWith(LNK)) { int lastIndexOf = name.lastIndexOf(LNK); return name.substring(0, lastIndexOf); @@ -621,7 +621,7 @@ public class HdfsDirectory extends Directory implements LastModified, HdfsSymlin return new Path(_path, name); } - protected Path getRealFilePathFromSymlink(String name) throws IOException { + public Path getRealFilePathFromSymlink(String name) throws IOException { // need to cache if (_useCache) { Path path = _symlinkPathMap.get(name);