Repository: nifi
Updated Branches:
  refs/heads/master 58cf15a91 -> 4fdea680e


NIFI-3905: This closes #1805. When a Provenance Query is submitted to 
WriteAheadProvenanceRepository, purge any obsolete queries from the internal 
state before rejecting the query due to 'too many outstanding queries'

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4fdea680
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4fdea680
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4fdea680

Branch: refs/heads/master
Commit: 4fdea680ecd9f4ea982961f37bb870b4bbad7e45
Parents: 58cf15a
Author: Mark Payne <[email protected]>
Authored: Tue May 16 10:06:51 2017 -0400
Committer: joewitt <[email protected]>
Committed: Tue May 16 13:48:16 2017 -0400

----------------------------------------------------------------------
 .../index/lucene/LuceneEventIndex.java          | 50 ++++++++++----------
 1 file changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4fdea680/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
index f4b47d3..4a38071 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java
@@ -154,7 +154,7 @@ public class LuceneEventIndex implements EventIndex {
 
         maintenanceExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Provenance Repository Maintenance"));
         maintenanceExecutor.scheduleWithFixedDelay(() -> performMaintenance(), 
1, 1, TimeUnit.MINUTES);
-        maintenanceExecutor.scheduleWithFixedDelay(new 
RemoveExpiredQueryResults(), 30, 30, TimeUnit.SECONDS);
+        maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 
30, 30, TimeUnit.SECONDS);
 
         cachedQueries.add(new LatestEventsQuery());
         cachedQueries.add(new LatestEventsPerProcessorQuery());
@@ -633,8 +633,11 @@ public class LuceneEventIndex implements EventIndex {
     private void validate(final Query query) {
         final int numQueries = querySubmissionMap.size();
         if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
-            throw new IllegalStateException("Cannot process query because 
there are currently " + numQueries + " queries whose results have not "
-                + "been deleted due to poorly behaving clients not issuing 
DELETE requests. Please try again later.");
+            purgeObsoleteQueries();
+            if (querySubmissionMap.size() > MAX_UNDELETED_QUERY_RESULTS) {
+                throw new IllegalStateException("Cannot process query because 
there are currently " + numQueries + " queries whose results have not "
+                    + "been deleted due to poorly behaving clients not issuing 
DELETE requests. Please try again later.");
+            }
         }
 
         if (query.getEndDate() != null && query.getStartDate() != null && 
query.getStartDate().getTime() > query.getEndDate().getTime()) {
@@ -702,35 +705,32 @@ public class LuceneEventIndex implements EventIndex {
         return removed;
     }
 
-    private class RemoveExpiredQueryResults implements Runnable {
-        @Override
-        public void run() {
-            try {
-                final Date now = new Date();
+    private void purgeObsoleteQueries() {
+        try {
+            final Date now = new Date();
 
-                final Iterator<Map.Entry<String, AsyncQuerySubmission>> 
queryIterator = querySubmissionMap.entrySet().iterator();
-                while (queryIterator.hasNext()) {
-                    final Map.Entry<String, AsyncQuerySubmission> entry = 
queryIterator.next();
+            final Iterator<Map.Entry<String, AsyncQuerySubmission>> 
queryIterator = querySubmissionMap.entrySet().iterator();
+            while (queryIterator.hasNext()) {
+                final Map.Entry<String, AsyncQuerySubmission> entry = 
queryIterator.next();
 
-                    final StandardQueryResult result = 
entry.getValue().getResult();
-                    if (entry.getValue().isCanceled() || result.isFinished() 
&& result.getExpiration().before(now)) {
-                        queryIterator.remove();
-                    }
+                final StandardQueryResult result = 
entry.getValue().getResult();
+                if (entry.getValue().isCanceled() || result.isFinished() && 
result.getExpiration().before(now)) {
+                    queryIterator.remove();
                 }
+            }
 
-                final Iterator<Map.Entry<String, AsyncLineageSubmission>> 
lineageIterator = lineageSubmissionMap.entrySet().iterator();
-                while (lineageIterator.hasNext()) {
-                    final Map.Entry<String, AsyncLineageSubmission> entry = 
lineageIterator.next();
+            final Iterator<Map.Entry<String, AsyncLineageSubmission>> 
lineageIterator = lineageSubmissionMap.entrySet().iterator();
+            while (lineageIterator.hasNext()) {
+                final Map.Entry<String, AsyncLineageSubmission> entry = 
lineageIterator.next();
 
-                    final StandardLineageResult result = 
entry.getValue().getResult();
-                    if (entry.getValue().isCanceled() || result.isFinished() 
&& result.getExpiration().before(now)) {
-                        lineageIterator.remove();
-                    }
+                final StandardLineageResult result = 
entry.getValue().getResult();
+                if (entry.getValue().isCanceled() || result.isFinished() && 
result.getExpiration().before(now)) {
+                    lineageIterator.remove();
                 }
-            } catch (final Exception e) {
-                logger.error("Failed to expire Provenance Query Results due to 
{}", e.toString());
-                logger.error("", e);
             }
+        } catch (final Exception e) {
+            logger.error("Failed to expire Provenance Query Results due to 
{}", e.toString());
+            logger.error("", e);
         }
     }
 }

Reply via email to