http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index 5822fc5..c301bf3 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -1,357 +1,357 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.hadoop; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; -import org.apache.nifi.annotation.notification.PrimaryNodeState; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestListHDFS { - - private TestRunner runner; - private ListHDFSWithMockedFileSystem proc; - private MockCacheClient service; - - @Before - public void setup() throws InitializationException { - proc = new ListHDFSWithMockedFileSystem(); - runner = TestRunners.newTestRunner(proc); - - service = new MockCacheClient(); - runner.addControllerService("service", service); - runner.enableControllerService(service); - - runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml"); - runner.setProperty(ListHDFS.DIRECTORY, "/test"); - runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service"); - } - - @Test - public void testListingHasCorrectAttributes() { - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); - final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); - mff.assertAttributeEquals("path", "/test"); - mff.assertAttributeEquals("filename", "testFile.txt"); - } - - - @Test - public void testRecursive() { - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); - - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); - proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS); - for (int i=0; i < 2; i++) { - final MockFlowFile ff = flowFiles.get(i); - final String filename = ff.getAttribute("filename"); - - if (filename.equals("testFile.txt")) { - ff.assertAttributeEquals("path", "/test"); - } else if ( filename.equals("1.txt")) { - ff.assertAttributeEquals("path", "/test/testDir"); - } else { - Assert.fail("filename was " + filename); - } - } - } - - @Test - public void testNotRecursive() { - runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false"); - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); - - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); - proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); - - final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); - mff1.assertAttributeEquals("path", "/test"); - mff1.assertAttributeEquals("filename", "testFile.txt"); - } - - - @Test - public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() { - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); - - runner.run(); - - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); - - final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); - mff1.assertAttributeEquals("path", "/test"); - mff1.assertAttributeEquals("filename", "testFile.txt"); - - runner.clearTransferState(); - - // add new file to pull - proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); - - // trigger primary node change - proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); - - // cause calls to service to fail - service.failOnCalls = true; - - runner.run(); - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); - - runner.run(); - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); - - final String key = proc.getKey("/test"); - - // wait just to a bit to ensure that the timestamp changes when we update the service - final Object curVal = service.values.get(key); - try { - Thread.sleep(10L); - } catch (final InterruptedException ie) { - } - - service.failOnCalls = false; - runner.run(); - runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); - - // ensure state saved both locally & remotely - assertTrue(proc.localStateSaved); - assertNotNull(service.values.get(key)); - assertNotSame(curVal, service.values.get(key)); - } - - - private FsPermission create777() { - return new FsPermission((short) 0777); - } - - - private class ListHDFSWithMockedFileSystem extends ListHDFS { - private final MockFileSystem fileSystem = new MockFileSystem(); - private boolean localStateSaved = false; - - @Override - protected FileSystem getFileSystem() { - return fileSystem; - } - - @Override - protected File getPersistenceFile() { - return new File("target/conf/state-file"); - } - - @Override - protected FileSystem getFileSystem(final Configuration config) throws IOException { - return fileSystem; - } - - @Override - protected void persistLocalState(final String directory, final String serializedState) throws IOException { - super.persistLocalState(directory, serializedState); - localStateSaved = true; - } - } - - - private class MockFileSystem extends FileSystem { - private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>(); - - public void addFileStatus(final Path parent, final FileStatus child) { - Set<FileStatus> children = fileStatuses.get(parent); - if ( children == null ) { - children = new HashSet<>(); - fileStatuses.put(parent, children); - } - - children.add(child); - } - - - @Override - public long getDefaultBlockSize() { - return 1024L; - } - - @Override - public short getDefaultReplication() { - return 1; - } - - @Override - public URI getUri() { - return null; - } - - @Override - public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { - return null; - } - - @Override - public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, - final long blockSize, final Progressable progress) throws IOException { - return null; - } - - @Override - public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { - return null; - } - - @Override - public boolean rename(final Path src, final Path dst) throws IOException { - return false; - } - - @Override - public boolean delete(final Path f, final boolean recursive) throws IOException { - return false; - } - - @Override - public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { - final Set<FileStatus> statuses = fileStatuses.get(f); - if ( statuses == null ) { - return new FileStatus[0]; - } - - return statuses.toArray(new FileStatus[statuses.size()]); - } - - @Override - public void setWorkingDirectory(final Path new_dir) { - - } - - @Override - public Path getWorkingDirectory() { - return new Path(new File(".").getAbsolutePath()); - } - - @Override - public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { - return false; - } - - @Override - public FileStatus getFileStatus(final Path f) throws IOException { - return null; - } - - } - - - private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { - private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); - private boolean failOnCalls = false; - - private void verifyNotFail() throws IOException { - if ( failOnCalls ) { - throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); - } - } - - @Override - public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { - verifyNotFail(); - final Object retValue = values.putIfAbsent(key, value); - return (retValue == null); - } - - @Override - @SuppressWarnings("unchecked") - public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, - final Deserializer<V> valueDeserializer) throws IOException { - verifyNotFail(); - return (V) values.putIfAbsent(key, value); - } - - @Override - public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { - verifyNotFail(); - return values.containsKey(key); - } - - @Override - public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { - verifyNotFail(); - values.put(key, value); - } - - @Override - @SuppressWarnings("unchecked") - public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { - verifyNotFail(); - return (V) values.get(key); - } - - @Override - public void close() throws IOException { - } - - @Override - public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { - verifyNotFail(); - values.remove(key); - return true; - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestListHDFS { + + private TestRunner runner; + private ListHDFSWithMockedFileSystem proc; + private MockCacheClient service; + + @Before + public void setup() throws InitializationException { + proc = new ListHDFSWithMockedFileSystem(); + runner = TestRunners.newTestRunner(proc); + + service = new MockCacheClient(); + runner.addControllerService("service", service); + runner.enableControllerService(service); + + runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml"); + runner.setProperty(ListHDFS.DIRECTORY, "/test"); + runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service"); + } + + @Test + public void testListingHasCorrectAttributes() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff.assertAttributeEquals("path", "/test"); + mff.assertAttributeEquals("filename", "testFile.txt"); + } + + + @Test + public void testRecursive() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS); + for (int i=0; i < 2; i++) { + final MockFlowFile ff = flowFiles.get(i); + final String filename = ff.getAttribute("filename"); + + if (filename.equals("testFile.txt")) { + ff.assertAttributeEquals("path", "/test"); + } else if ( filename.equals("1.txt")) { + ff.assertAttributeEquals("path", "/test/testDir"); + } else { + Assert.fail("filename was " + filename); + } + } + } + + @Test + public void testNotRecursive() { + runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false"); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + } + + + @Test + public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff1.assertAttributeEquals("path", "/test"); + mff1.assertAttributeEquals("filename", "testFile.txt"); + + runner.clearTransferState(); + + // add new file to pull + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + + // trigger primary node change + proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); + + // cause calls to service to fail + service.failOnCalls = true; + + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + final String key = proc.getKey("/test"); + + // wait just to a bit to ensure that the timestamp changes when we update the service + final Object curVal = service.values.get(key); + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + } + + service.failOnCalls = false; + runner.run(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + // ensure state saved both locally & remotely + assertTrue(proc.localStateSaved); + assertNotNull(service.values.get(key)); + assertNotSame(curVal, service.values.get(key)); + } + + + private FsPermission create777() { + return new FsPermission((short) 0777); + } + + + private class ListHDFSWithMockedFileSystem extends ListHDFS { + private final MockFileSystem fileSystem = new MockFileSystem(); + private boolean localStateSaved = false; + + @Override + protected FileSystem getFileSystem() { + return fileSystem; + } + + @Override + protected File getPersistenceFile() { + return new File("target/conf/state-file"); + } + + @Override + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return fileSystem; + } + + @Override + protected void persistLocalState(final String directory, final String serializedState) throws IOException { + super.persistLocalState(directory, serializedState); + localStateSaved = true; + } + } + + + private class MockFileSystem extends FileSystem { + private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>(); + + public void addFileStatus(final Path parent, final FileStatus child) { + Set<FileStatus> children = fileStatuses.get(parent); + if ( children == null ) { + children = new HashSet<>(); + fileStatuses.put(parent, children); + } + + children.add(child); + } + + + @Override + public long getDefaultBlockSize() { + return 1024L; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, + final long blockSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + return false; + } + + @Override + public boolean delete(final Path f, final boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { + final Set<FileStatus> statuses = fileStatuses.get(f); + if ( statuses == null ) { + return new FileStatus[0]; + } + + return statuses.toArray(new FileStatus[statuses.size()]); + } + + @Override + public void setWorkingDirectory(final Path new_dir) { + + } + + @Override + public Path getWorkingDirectory() { + return new Path(new File(".").getAbsolutePath()); + } + + @Override + public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(final Path f) throws IOException { + return null; + } + + } + + + private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); + private boolean failOnCalls = false; + + private void verifyNotFail() throws IOException { + if ( failOnCalls ) { + throw new IOException("Could not call to remote service because Unit Test marked service unavailable"); + } + } + + @Override + public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + verifyNotFail(); + final Object retValue = values.putIfAbsent(key, value); + return (retValue == null); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, + final Deserializer<V> valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.putIfAbsent(key, value); + } + + @Override + public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { + verifyNotFail(); + return values.containsKey(key); + } + + @Override + public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + verifyNotFail(); + values.put(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + verifyNotFail(); + return (V) values.get(key); + } + + @Override + public void close() throws IOException { + } + + @Override + public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { + verifyNotFail(); + values.remove(key); + return true; + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 31f31a5..57d0d78 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -1,512 +1,512 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.lucene; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IndexManager implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(IndexManager.class); - - private final Lock lock = new ReentrantLock(); - private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); - private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>(); - - - public void removeIndex(final File indexDirectory) { - final File absoluteFile = indexDirectory.getAbsoluteFile(); - logger.info("Removing index {}", indexDirectory); - - lock.lock(); - try { - final IndexWriterCount count = writerCounts.remove(absoluteFile); - if ( count != null ) { - try { - count.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } - - for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) { - for ( final ActiveIndexSearcher searcher : searcherList ) { - try { - searcher.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Searcher {} for {} due to {}", - searcher.getSearcher(), absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } - } - } finally { - lock.unlock(); - } - } - - public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.debug("Borrowing index writer for {}", indexingDirectory); - - lock.lock(); - try { - IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount == null ) { - final List<Closeable> closeables = new ArrayList<>(); - final Directory directory = FSDirectory.open(indexingDirectory); - closeables.add(directory); - - try { - final Analyzer analyzer = new StandardAnalyzer(); - closeables.add(analyzer); - - final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); - config.setWriteLockTimeout(300000L); - - final IndexWriter indexWriter = new IndexWriter(directory, config); - writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); - logger.debug("Providing new index writer for {}", indexingDirectory); - } catch (final IOException ioe) { - for ( final Closeable closeable : closeables ) { - try { - closeable.close(); - } catch (final IOException ioe2) { - ioe.addSuppressed(ioe2); - } - } - - throw ioe; - } - - writerCounts.put(absoluteFile, writerCount); - - // Mark any active searchers as poisoned because we are updating the index - final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile); - if ( searchers != null ) { - for (final ActiveIndexSearcher activeSearcher : searchers) { - activeSearcher.poison(); - } - } - } else { - logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); - } - - return writerCount.getWriter(); - } finally { - lock.unlock(); - } - } - - public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { - final File absoluteFile = indexingDirectory.getAbsoluteFile(); - logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); - - lock.lock(); - try { - final IndexWriterCount count = writerCounts.remove(absoluteFile); - - try { - if ( count == null ) { - logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " - + "This could potentially lead to a resource leak", writer, indexingDirectory); - writer.close(); - } else if ( count.getCount() <= 1 ) { - // we are finished with this writer. - logger.debug("Closing Index Writer for {}", indexingDirectory); - count.close(); - } else { - // decrement the count. - logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); - writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); - } - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } finally { - lock.unlock(); - } - } - - - public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { - final File absoluteFile = indexDir.getAbsoluteFile(); - logger.debug("Borrowing index searcher for {}", indexDir); - - lock.lock(); - try { - // check if we already have a reader cached. - List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); - if ( currentlyCached == null ) { - currentlyCached = new ArrayList<>(); - activeSearchers.put(absoluteFile, currentlyCached); - } else { - // keep track of any searchers that have been closed so that we can remove them - // from our cache later. - final Set<ActiveIndexSearcher> expired = new HashSet<>(); - - try { - for ( final ActiveIndexSearcher searcher : currentlyCached ) { - if ( searcher.isCache() ) { - // if the searcher is poisoned, we want to close and expire it. - if ( searcher.isPoisoned() ) { - logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile); - expired.add(searcher); - continue; - } - - // if there are no references to the reader, it will have been closed. Since there is no - // isClosed() method, this is how we determine whether it's been closed or not. - final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); - if ( refCount <= 0 ) { - // if refCount == 0, then the reader has been closed, so we need to discard the searcher - logger.debug("Reference count for cached Index Searcher for {} is currently {}; " - + "removing cached searcher", absoluteFile, refCount); - expired.add(searcher); - continue; - } - - logger.debug("Providing previously cached index searcher for {}", indexDir); - return searcher.getSearcher(); - } - } - } finally { - // if we have any expired index searchers, we need to close them and remove them - // from the cache so that we don't try to use them again later. - for ( final ActiveIndexSearcher searcher : expired ) { - try { - searcher.close(); - } catch (final Exception e) { - logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); - } - - currentlyCached.remove(searcher); - } - } - } - - final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount == null ) { - final Directory directory = FSDirectory.open(absoluteFile); - logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); - - try { - final DirectoryReader directoryReader = DirectoryReader.open(directory); - final IndexSearcher searcher = new IndexSearcher(directoryReader); - - // we want to cache the searcher that we create, since it's just a reader. - final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); - currentlyCached.add(cached); - - return cached.getSearcher(); - } catch (final IOException e) { - try { - directory.close(); - } catch (final IOException ioe) { - e.addSuppressed(ioe); - } - - throw e; - } - } else { - logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " - + "counter to {}", indexDir, writerCount.getCount() + 1); - - // increment the writer count to ensure that it's kept open. - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); - - // create a new Index Searcher from the writer so that we don't have an issue with trying - // to read from a directory that's locked. If we get the "no segments* file found" with - // Lucene, this indicates that an IndexWriter already has the directory open. - final IndexWriter writer = writerCount.getWriter(); - final DirectoryReader directoryReader = DirectoryReader.open(writer, false); - final IndexSearcher searcher = new IndexSearcher(directoryReader); - - // we don't want to cache this searcher because it's based on a writer, so we want to get - // new values the next time that we search. - final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); - - currentlyCached.add(activeSearcher); - return activeSearcher.getSearcher(); - } - } finally { - lock.unlock(); - } - } - - - public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { - final File absoluteFile = indexDirectory.getAbsoluteFile(); - logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); - - lock.lock(); - try { - // check if we already have a reader cached. - final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); - if ( currentlyCached == null ) { - logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " - + "result in a resource leak", indexDirectory); - return; - } - - // Check if the given searcher is in our list. We use an Iterator to do this so that if we - // find it we can call remove() on the iterator if need be. - final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator(); - while (itr.hasNext()) { - final ActiveIndexSearcher activeSearcher = itr.next(); - if ( activeSearcher.getSearcher().equals(searcher) ) { - if ( activeSearcher.isCache() ) { - // if the searcher is poisoned, close it and remove from "pool". - if ( activeSearcher.isPoisoned() ) { - itr.remove(); - - try { - logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory); - activeSearcher.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - - return; - } else { - // the searcher is cached. Just leave it open. - logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); - return; - } - } else { - // searcher is not cached. It was created from a writer, and we want - // the newest updates the next time that we get a searcher, so we will - // go ahead and close this one out. - itr.remove(); - - // decrement the writer count because we incremented it when creating the searcher - final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); - if ( writerCount != null ) { - if ( writerCount.getCount() <= 1 ) { - try { - logger.debug("Index searcher for {} is not cached. Writer count is " - + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); - - writerCount.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } else { - logger.debug("Index searcher for {} is not cached. Writer count is decremented " - + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); - - writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), - writerCount.getAnalyzer(), writerCount.getDirectory(), - writerCount.getCount() - 1)); - } - } - - try { - logger.debug("Closing Index Searcher for {}", indexDirectory); - activeSearcher.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); - if ( logger.isDebugEnabled() ) { - logger.warn("", ioe); - } - } - } - } - } - } finally { - lock.unlock(); - } - } - - @Override - public void close() throws IOException { - logger.debug("Closing Index Manager"); - - lock.lock(); - try { - IOException ioe = null; - - for ( final IndexWriterCount count : writerCounts.values() ) { - try { - count.close(); - } catch (final IOException e) { - if ( ioe == null ) { - ioe = e; - } else { - ioe.addSuppressed(e); - } - } - } - - for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) { - for (final ActiveIndexSearcher searcher : searcherList) { - try { - searcher.close(); - } catch (final IOException e) { - if ( ioe == null ) { - ioe = e; - } else { - ioe.addSuppressed(e); - } - } - } - } - - if ( ioe != null ) { - throw ioe; - } - } finally { - lock.unlock(); - } - } - - - private static void close(final Closeable... closeables) throws IOException { - IOException ioe = null; - for ( final Closeable closeable : closeables ) { - if ( closeable == null ) { - continue; - } - - try { - closeable.close(); - } catch (final IOException e) { - if ( ioe == null ) { - ioe = e; - } else { - ioe.addSuppressed(e); - } - } - } - - if ( ioe != null ) { - throw ioe; - } - } - - - private static class ActiveIndexSearcher implements Closeable { - private final IndexSearcher searcher; - private final DirectoryReader directoryReader; - private final Directory directory; - private final boolean cache; - private boolean poisoned = false; - - public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader, - final Directory directory, final boolean cache) { - this.searcher = searcher; - this.directoryReader = directoryReader; - this.directory = directory; - this.cache = cache; - } - - public boolean isCache() { - return cache; - } - - public IndexSearcher getSearcher() { - return searcher; - } - - public boolean isPoisoned() { - return poisoned; - } - - public void poison() { - this.poisoned = true; - } - - @Override - public void close() throws IOException { - IndexManager.close(directoryReader, directory); - } - } - - - private static class IndexWriterCount implements Closeable { - private final IndexWriter writer; - private final Analyzer analyzer; - private final Directory directory; - private final int count; - - public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { - this.writer = writer; - this.analyzer = analyzer; - this.directory = directory; - this.count = count; - } - - public Analyzer getAnalyzer() { - return analyzer; - } - - public Directory getDirectory() { - return directory; - } - - public IndexWriter getWriter() { - return writer; - } - - public int getCount() { - return count; - } - - @Override - public void close() throws IOException { - IndexManager.close(writer, analyzer, directory); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lucene; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexManager implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(IndexManager.class); + + private final Lock lock = new ReentrantLock(); + private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); + private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>(); + + + public void removeIndex(final File indexDirectory) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.info("Removing index {}", indexDirectory); + + lock.lock(); + try { + final IndexWriterCount count = writerCounts.remove(absoluteFile); + if ( count != null ) { + try { + count.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + + for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) { + for ( final ActiveIndexSearcher searcher : searcherList ) { + try { + searcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher {} for {} due to {}", + searcher.getSearcher(), absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } finally { + lock.unlock(); + } + } + + public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Borrowing index writer for {}", indexingDirectory); + + lock.lock(); + try { + IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final List<Closeable> closeables = new ArrayList<>(); + final Directory directory = FSDirectory.open(indexingDirectory); + closeables.add(directory); + + try { + final Analyzer analyzer = new StandardAnalyzer(); + closeables.add(analyzer); + + final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer); + config.setWriteLockTimeout(300000L); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1); + logger.debug("Providing new index writer for {}", indexingDirectory); + } catch (final IOException ioe) { + for ( final Closeable closeable : closeables ) { + try { + closeable.close(); + } catch (final IOException ioe2) { + ioe.addSuppressed(ioe2); + } + } + + throw ioe; + } + + writerCounts.put(absoluteFile, writerCount); + + // Mark any active searchers as poisoned because we are updating the index + final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile); + if ( searchers != null ) { + for (final ActiveIndexSearcher activeSearcher : searchers) { + activeSearcher.poison(); + } + } + } else { + logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + } + + return writerCount.getWriter(); + } finally { + lock.unlock(); + } + } + + public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) { + final File absoluteFile = indexingDirectory.getAbsoluteFile(); + logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory); + + lock.lock(); + try { + final IndexWriterCount count = writerCounts.remove(absoluteFile); + + try { + if ( count == null ) { + logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. " + + "This could potentially lead to a resource leak", writer, indexingDirectory); + writer.close(); + } else if ( count.getCount() <= 1 ) { + // we are finished with this writer. + logger.debug("Closing Index Writer for {}", indexingDirectory); + count.close(); + } else { + // decrement the count. + logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1); + writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1)); + } + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer {} due to {}", writer, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } finally { + lock.unlock(); + } + } + + + public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException { + final File absoluteFile = indexDir.getAbsoluteFile(); + logger.debug("Borrowing index searcher for {}", indexDir); + + lock.lock(); + try { + // check if we already have a reader cached. + List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + currentlyCached = new ArrayList<>(); + activeSearchers.put(absoluteFile, currentlyCached); + } else { + // keep track of any searchers that have been closed so that we can remove them + // from our cache later. + final Set<ActiveIndexSearcher> expired = new HashSet<>(); + + try { + for ( final ActiveIndexSearcher searcher : currentlyCached ) { + if ( searcher.isCache() ) { + // if the searcher is poisoned, we want to close and expire it. + if ( searcher.isPoisoned() ) { + logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile); + expired.add(searcher); + continue; + } + + // if there are no references to the reader, it will have been closed. Since there is no + // isClosed() method, this is how we determine whether it's been closed or not. + final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); + if ( refCount <= 0 ) { + // if refCount == 0, then the reader has been closed, so we need to discard the searcher + logger.debug("Reference count for cached Index Searcher for {} is currently {}; " + + "removing cached searcher", absoluteFile, refCount); + expired.add(searcher); + continue; + } + + logger.debug("Providing previously cached index searcher for {}", indexDir); + return searcher.getSearcher(); + } + } + } finally { + // if we have any expired index searchers, we need to close them and remove them + // from the cache so that we don't try to use them again later. + for ( final ActiveIndexSearcher searcher : expired ) { + try { + searcher.close(); + } catch (final Exception e) { + logger.debug("Failed to close 'expired' IndexSearcher {}", searcher); + } + + currentlyCached.remove(searcher); + } + } + } + + final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount == null ) { + final Directory directory = FSDirectory.open(absoluteFile); + logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); + + try { + final DirectoryReader directoryReader = DirectoryReader.open(directory); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we want to cache the searcher that we create, since it's just a reader. + final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true); + currentlyCached.add(cached); + + return cached.getSearcher(); + } catch (final IOException e) { + try { + directory.close(); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } + + throw e; + } + } else { + logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing " + + "counter to {}", indexDir, writerCount.getCount() + 1); + + // increment the writer count to ensure that it's kept open. + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1)); + + // create a new Index Searcher from the writer so that we don't have an issue with trying + // to read from a directory that's locked. If we get the "no segments* file found" with + // Lucene, this indicates that an IndexWriter already has the directory open. + final IndexWriter writer = writerCount.getWriter(); + final DirectoryReader directoryReader = DirectoryReader.open(writer, false); + final IndexSearcher searcher = new IndexSearcher(directoryReader); + + // we don't want to cache this searcher because it's based on a writer, so we want to get + // new values the next time that we search. + final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false); + + currentlyCached.add(activeSearcher); + return activeSearcher.getSearcher(); + } + } finally { + lock.unlock(); + } + } + + + public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) { + final File absoluteFile = indexDirectory.getAbsoluteFile(); + logger.debug("Returning index searcher for {} to IndexManager", indexDirectory); + + lock.lock(); + try { + // check if we already have a reader cached. + final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile); + if ( currentlyCached == null ) { + logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " + + "result in a resource leak", indexDirectory); + return; + } + + // Check if the given searcher is in our list. We use an Iterator to do this so that if we + // find it we can call remove() on the iterator if need be. + final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator(); + while (itr.hasNext()) { + final ActiveIndexSearcher activeSearcher = itr.next(); + if ( activeSearcher.getSearcher().equals(searcher) ) { + if ( activeSearcher.isCache() ) { + // if the searcher is poisoned, close it and remove from "pool". + if ( activeSearcher.isPoisoned() ) { + itr.remove(); + + try { + logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory); + activeSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + + return; + } else { + // the searcher is cached. Just leave it open. + logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); + return; + } + } else { + // searcher is not cached. It was created from a writer, and we want + // the newest updates the next time that we get a searcher, so we will + // go ahead and close this one out. + itr.remove(); + + // decrement the writer count because we incremented it when creating the searcher + final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + if ( writerCount != null ) { + if ( writerCount.getCount() <= 1 ) { + try { + logger.debug("Index searcher for {} is not cached. Writer count is " + + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1); + + writerCount.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } else { + logger.debug("Index searcher for {} is not cached. Writer count is decremented " + + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1); + + writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), + writerCount.getAnalyzer(), writerCount.getDirectory(), + writerCount.getCount() - 1)); + } + } + + try { + logger.debug("Closing Index Searcher for {}", indexDirectory); + activeSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + } + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + logger.debug("Closing Index Manager"); + + lock.lock(); + try { + IOException ioe = null; + + for ( final IndexWriterCount count : writerCounts.values() ) { + try { + count.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) { + for (final ActiveIndexSearcher searcher : searcherList) { + try { + searcher.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } finally { + lock.unlock(); + } + } + + + private static void close(final Closeable... closeables) throws IOException { + IOException ioe = null; + for ( final Closeable closeable : closeables ) { + if ( closeable == null ) { + continue; + } + + try { + closeable.close(); + } catch (final IOException e) { + if ( ioe == null ) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + + if ( ioe != null ) { + throw ioe; + } + } + + + private static class ActiveIndexSearcher implements Closeable { + private final IndexSearcher searcher; + private final DirectoryReader directoryReader; + private final Directory directory; + private final boolean cache; + private boolean poisoned = false; + + public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader, + final Directory directory, final boolean cache) { + this.searcher = searcher; + this.directoryReader = directoryReader; + this.directory = directory; + this.cache = cache; + } + + public boolean isCache() { + return cache; + } + + public IndexSearcher getSearcher() { + return searcher; + } + + public boolean isPoisoned() { + return poisoned; + } + + public void poison() { + this.poisoned = true; + } + + @Override + public void close() throws IOException { + IndexManager.close(directoryReader, directory); + } + } + + + private static class IndexWriterCount implements Closeable { + private final IndexWriter writer; + private final Analyzer analyzer; + private final Directory directory; + private final int count; + + public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) { + this.writer = writer; + this.analyzer = analyzer; + this.directory = directory; + this.count = count; + } + + public Analyzer getAnalyzer() { + return analyzer; + } + + public Directory getDirectory() { + return directory; + } + + public IndexWriter getWriter() { + return writer; + } + + public int getCount() { + return count; + } + + @Override + public void close() throws IOException { + IndexManager.close(writer, analyzer, directory); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java index 61f86e7..60328fa 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java @@ -1,155 +1,155 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.provenance.toc; - -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; - -/** - * Standard implementation of TocReader. - * - * Expects .toc file to be in the following format; - * - * byte 0: version - * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed - * byte 2-9: long: offset of block 0 - * byte 10-17: long: offset of block 1 - * ... - * byte (N*8+2)-(N*8+9): long: offset of block N - */ -public class StandardTocReader implements TocReader { - private final boolean compressed; - private final long[] offsets; - private final long[] firstEventIds; - - public StandardTocReader(final File file) throws IOException { - try (final FileInputStream fis = new FileInputStream(file); - final DataInputStream dis = new DataInputStream(fis)) { - - final int version = dis.read(); - if ( version < 0 ) { - throw new EOFException(); - } - - final int compressionFlag = dis.read(); - if ( compressionFlag < 0 ) { - throw new EOFException(); - } - - if ( compressionFlag == 0 ) { - compressed = false; - } else if ( compressionFlag == 1 ) { - compressed = true; - } else { - throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); - } - - final int blockInfoBytes; - switch (version) { - case 1: - blockInfoBytes = 8; - break; - case 2: - default: - blockInfoBytes = 16; - break; - } - - final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes); - offsets = new long[numBlocks]; - - if ( version > 1 ) { - firstEventIds = new long[numBlocks]; - } else { - firstEventIds = new long[0]; - } - - for (int i=0; i < numBlocks; i++) { - offsets[i] = dis.readLong(); - - if ( version > 1 ) { - firstEventIds[i] = dis.readLong(); - } - } - } - } - - @Override - public boolean isCompressed() { - return compressed; - } - - @Override - public long getBlockOffset(final int blockIndex) { - if ( blockIndex >= offsets.length ) { - return -1L; - } - return offsets[blockIndex]; - } - - @Override - public long getLastBlockOffset() { - if ( offsets.length == 0 ) { - return 0L; - } - return offsets[offsets.length - 1]; - } - - @Override - public void close() throws IOException { - } - - @Override - public int getBlockIndex(final long blockOffset) { - for (int i=0; i < offsets.length; i++) { - if ( offsets[i] > blockOffset ) { - // if the offset is less than the offset of our first block, - // just return 0 to indicate the first block. Otherwise, - // return i-1 because i represents the first block whose offset is - // greater than 'blockOffset'. - return (i == 0) ? 0 : i-1; - } - } - - // None of the blocks have an offset greater than the provided offset. - // Therefore, if the event is present, it must be in the last block. - return offsets.length - 1; - } - - @Override - public Integer getBlockIndexForEventId(final long eventId) { - // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC), - // or if the event ID is less than the first Event ID in this TOC, then the Event ID - // is unknown -- return null. - if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) { - return null; - } - - for (int i=1; i < firstEventIds.length; i++) { - if ( firstEventIds[i] > eventId ) { - return i-1; - } - } - - // None of the blocks start with an Event ID greater than the provided ID. - // Therefore, if the event is present, it must be in the last block. - return firstEventIds.length - 1; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.toc; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * Standard implementation of TocReader. + * + * Expects .toc file to be in the following format; + * + * byte 0: version + * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed + * byte 2-9: long: offset of block 0 + * byte 10-17: long: offset of block 1 + * ... + * byte (N*8+2)-(N*8+9): long: offset of block N + */ +public class StandardTocReader implements TocReader { + private final boolean compressed; + private final long[] offsets; + private final long[] firstEventIds; + + public StandardTocReader(final File file) throws IOException { + try (final FileInputStream fis = new FileInputStream(file); + final DataInputStream dis = new DataInputStream(fis)) { + + final int version = dis.read(); + if ( version < 0 ) { + throw new EOFException(); + } + + final int compressionFlag = dis.read(); + if ( compressionFlag < 0 ) { + throw new EOFException(); + } + + if ( compressionFlag == 0 ) { + compressed = false; + } else if ( compressionFlag == 1 ) { + compressed = true; + } else { + throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag); + } + + final int blockInfoBytes; + switch (version) { + case 1: + blockInfoBytes = 8; + break; + case 2: + default: + blockInfoBytes = 16; + break; + } + + final int numBlocks = (int) ((file.length() - 2) / blockInfoBytes); + offsets = new long[numBlocks]; + + if ( version > 1 ) { + firstEventIds = new long[numBlocks]; + } else { + firstEventIds = new long[0]; + } + + for (int i=0; i < numBlocks; i++) { + offsets[i] = dis.readLong(); + + if ( version > 1 ) { + firstEventIds[i] = dis.readLong(); + } + } + } + } + + @Override + public boolean isCompressed() { + return compressed; + } + + @Override + public long getBlockOffset(final int blockIndex) { + if ( blockIndex >= offsets.length ) { + return -1L; + } + return offsets[blockIndex]; + } + + @Override + public long getLastBlockOffset() { + if ( offsets.length == 0 ) { + return 0L; + } + return offsets[offsets.length - 1]; + } + + @Override + public void close() throws IOException { + } + + @Override + public int getBlockIndex(final long blockOffset) { + for (int i=0; i < offsets.length; i++) { + if ( offsets[i] > blockOffset ) { + // if the offset is less than the offset of our first block, + // just return 0 to indicate the first block. Otherwise, + // return i-1 because i represents the first block whose offset is + // greater than 'blockOffset'. + return (i == 0) ? 0 : i-1; + } + } + + // None of the blocks have an offset greater than the provided offset. + // Therefore, if the event is present, it must be in the last block. + return offsets.length - 1; + } + + @Override + public Integer getBlockIndexForEventId(final long eventId) { + // if we don't have event ID's stored in the TOC (which happens for version 1 of the TOC), + // or if the event ID is less than the first Event ID in this TOC, then the Event ID + // is unknown -- return null. + if ( firstEventIds.length == 0 || eventId < firstEventIds[0] ) { + return null; + } + + for (int i=1; i < firstEventIds.length; i++) { + if ( firstEventIds[i] > eventId ) { + return i-1; + } + } + + // None of the blocks start with an Event ID greater than the provided ID. + // Therefore, if the event is present, it must be in the last block. + return firstEventIds.length - 1; + } +}
