Repository: nifi Updated Branches: refs/heads/master a5d630672 -> 778ba3957
NIFI-3631: This closes #1613. Do not change Active Directory in IndexDirectoryManager when it becomes full. Instead, wait until it is committed and is removed by the onIndexCommitted method. This resolved a bug where the index can exceed the configured limit but not yet be committed and as a result would no longer be the active index. As a result, this bug causes the IndexWriter never to get closed/removed from the IndexManager, and so a memory leak is created Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/778ba395 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/778ba395 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/778ba395 Branch: refs/heads/master Commit: 778ba3957e7d1b9ddc105b1655f53e85cf6ec2ab Parents: a5d6306 Author: Mark Payne <[email protected]> Authored: Mon Mar 20 17:18:01 2017 -0400 Committer: joewitt <[email protected]> Committed: Fri Apr 7 14:42:18 2017 -0400 ---------------------------------------------------------------------- .../index/lucene/IndexDirectoryManager.java | 9 ++-- .../provenance/index/lucene/IndexLocation.java | 21 +------- .../index/lucene/TestIndexDirectoryManager.java | 51 +++++++++++++++++++- 3 files changed, 55 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java index 09878ff..53f74e0 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexDirectoryManager.java @@ -75,7 +75,7 @@ public class IndexDirectoryManager { final long startTime = DirectoryUtils.getIndexTimestamp(indexDir); final List<IndexLocation> dirsForTimestamp = indexLocationByTimestamp.computeIfAbsent(startTime, t -> new ArrayList<>()); - final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName, repoConfig.getDesiredIndexSize()); + final IndexLocation indexLoc = new IndexLocation(indexDir, startTime, partitionName); dirsForTimestamp.add(indexLoc); final Tuple<Long, IndexLocation> tuple = latestIndexByStorageDir.get(storageDir); @@ -99,8 +99,7 @@ public class IndexDirectoryManager { final Map.Entry<Long, List<IndexLocation>> entry = itr.next(); final List<IndexLocation> locations = entry.getValue(); - final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), - directory.getName(), repoConfig.getDesiredIndexSize()); + final IndexLocation locToRemove = new IndexLocation(directory, DirectoryUtils.getIndexTimestamp(directory), directory.getName()); locations.remove(locToRemove); if (locations.isEmpty()) { itr.remove(); @@ -334,8 +333,8 @@ public class IndexDirectoryManager { */ public synchronized File getWritableIndexingDirectory(final long earliestTimestamp, final String partitionName) { IndexLocation indexLoc = activeIndices.get(partitionName); - if (indexLoc == null || indexLoc.isIndexFull()) { - indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName, repoConfig.getDesiredIndexSize()); + if (indexLoc == null) { + indexLoc = new IndexLocation(createIndex(earliestTimestamp, partitionName), earliestTimestamp, partitionName); logger.debug("Created new Index Directory {}", indexLoc); indexLocationByTimestamp.computeIfAbsent(earliestTimestamp, t -> new ArrayList<>()).add(indexLoc); http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java index 33867c6..f7de84f 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/IndexLocation.java @@ -18,24 +18,16 @@ package org.apache.nifi.provenance.index.lucene; import java.io.File; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.provenance.util.DirectoryUtils; public class IndexLocation { - private static final long SIZE_CHECK_MILLIS = TimeUnit.SECONDS.toMillis(30L); - private final File indexDirectory; private final long indexStartTimestamp; private final String partitionName; - private final long desiredIndexSize; - private volatile long lastSizeCheckTime = System.currentTimeMillis(); - public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName, final long desiredIndexSize) { + public IndexLocation(final File indexDirectory, final long indexStartTimestamp, final String partitionName) { this.indexDirectory = indexDirectory; this.indexStartTimestamp = indexStartTimestamp; this.partitionName = partitionName; - this.desiredIndexSize = desiredIndexSize; } public File getIndexDirectory() { @@ -50,17 +42,6 @@ public class IndexLocation { return partitionName; } - public boolean isIndexFull() { - final long now = System.currentTimeMillis(); - final long millisSinceLastSizeCheck = now - lastSizeCheckTime; - if (millisSinceLastSizeCheck < SIZE_CHECK_MILLIS) { - return false; - } - - lastSizeCheckTime = now; - return DirectoryUtils.getSize(indexDirectory) >= desiredIndexSize; - } - @Override public int hashCode() { return 31 + 41 * indexDirectory.hashCode(); http://git-wip-us.apache.org/repos/asf/nifi/blob/778ba395/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java index 3f3c422..efcb601 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/index/lucene/TestIndexDirectoryManager.java @@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -82,12 +85,58 @@ public class TestIndexDirectoryManager { } + @Test + public void testActiveIndexNotLostWhenSizeExceeded() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfig(2); + config.setDesiredIndexSize(4096 * 128); + + final File storageDir1 = config.getStorageDirectories().get("1"); + final File storageDir2 = config.getStorageDirectories().get("2"); + + final File index1 = new File(storageDir1, "index-1"); + final File index2 = new File(storageDir1, "index-2"); + final File index3 = new File(storageDir2, "index-3"); + final File index4 = new File(storageDir2, "index-4"); + + final File[] allIndices = new File[] {index1, index2, index3, index4}; + for (final File file : allIndices) { + assertTrue(file.mkdirs() || file.exists()); + } + + try { + final IndexDirectoryManager mgr = new IndexDirectoryManager(config); + mgr.initialize(); + + File indexDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1"); + final File newFile = new File(indexDir, "1.bin"); + try (final OutputStream fos = new FileOutputStream(newFile)) { + final byte[] data = new byte[4096]; + for (int i = 0; i < 1024; i++) { + fos.write(data); + } + } + + try { + final File newDir = mgr.getWritableIndexingDirectory(System.currentTimeMillis(), "1"); + assertEquals(indexDir, newDir); + } finally { + newFile.delete(); + } + } finally { + for (final File file : allIndices) { + file.delete(); + } + } + } + + + private IndexLocation createLocation(final long timestamp) { return createLocation(timestamp, "1"); } private IndexLocation createLocation(final long timestamp, final String partitionName) { - return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName, 1024 * 1024L); + return new IndexLocation(new File("index-" + timestamp), timestamp, partitionName); } private RepositoryConfiguration createConfig(final int partitions) {
