Repository: nifi Updated Branches: refs/heads/master 6aefc0b91 -> 5fd4a5579
NIFI-2778 added abilty to interrupt Lucene search polishing This closes #1138 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5fd4a557 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5fd4a557 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5fd4a557 Branch: refs/heads/master Commit: 5fd4a55791da27fdba577636ac985a294618328a Parents: 6aefc0b Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Fri Oct 14 10:50:29 2016 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Mon Nov 7 16:28:16 2016 -0500 ---------------------------------------------------------------------- .../nifi/provenance/AsyncQuerySubmission.java | 12 ++++++++++++ .../PersistentProvenanceRepository.java | 2 +- .../provenance/lucene/SimpleIndexManager.java | 18 +++++++++++++++++- 3 files changed, 30 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java index cd2ab39..66858b4 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/AsyncQuerySubmission.java @@ -16,7 +16,10 @@ */ package org.apache.nifi.provenance; +import java.util.ArrayList; import java.util.Date; +import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.nifi.provenance.search.Query; @@ -33,6 +36,8 @@ public class AsyncQuerySubmission implements QuerySubmission { private final StandardQueryResult queryResult; private final String submitterId; + private final List<Future<?>> queryExecutions = new ArrayList<>(); + /** * Constructs an AsyncQuerySubmission with the given query and the given * number of steps, indicating how many results must be added to this @@ -65,6 +70,9 @@ public class AsyncQuerySubmission implements QuerySubmission { @Override public void cancel() { this.canceled = true; + for (Future<?> queryExecution : this.queryExecutions) { + queryExecution.cancel(true); + } queryResult.cancel(); } @@ -82,4 +90,8 @@ public class AsyncQuerySubmission implements QuerySubmission { public StandardQueryResult getResult() { return queryResult; } + + public void addQueryExecution(Future<?> execution) { + this.queryExecutions.add(execution); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 0788716..f70bf7d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -2021,7 +2021,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L); } else { for (final File indexDir : indexDirectories) { - queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount)); + result.addQueryExecution(queryExecService.submit(new QueryRunnable(query, result, user, indexDir, retrievalCount))); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5fd4a557/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index daf6413..9e3bacd 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -44,8 +47,21 @@ public class SimpleIndexManager implements IndexManager { private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>(); private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); + private final ExecutorService searchExecutor = Executors.newCachedThreadPool(); + + @Override public void close() throws IOException { + logger.debug("Shutting down SimpleIndexManager search executor"); + this.searchExecutor.shutdown(); + try { + if (!this.searchExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + this.searchExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + this.searchExecutor.shutdownNow(); + } } @Override @@ -53,7 +69,7 @@ public class SimpleIndexManager implements IndexManager { logger.debug("Creating index searcher for {}", indexDir); final Directory directory = FSDirectory.open(indexDir); final DirectoryReader directoryReader = DirectoryReader.open(directory); - final IndexSearcher searcher = new IndexSearcher(directoryReader); + final IndexSearcher searcher = new IndexSearcher(directoryReader, this.searchExecutor); final List<Closeable> closeableList = new ArrayList<>(2); closeableList.add(directoryReader);