[
https://issues.apache.org/jira/browse/NIFI-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395721#comment-15395721
]
Mark Payne commented on NIFI-2395:
----------------------------------
Brian - I have looked into this. I believe the condition in the while loop is
actually correct. If we get rid of the Java syntax and focus on what it means
in English, it is saying:
while ("there are events on the queue" OR "more events will be added to the
queue") {
// wait for an event and index it
}
So I do not believe the case is that the thread evaluated the logic and
finished before any messages were added to the queue.
Rather, my suspicion is that the call the indexingAction.index() threw an
Exception. Unfortunately, if that occurs, the result is that the task would
terminate prematurely, and this could result in
the other thread blocking on queue.offer().
I think we can address pretty easily by updating the code that calls
queue.offer() such that if queue.offer() returns false we check the 'futures'
list to see if any futures have terminated Exceptionally. And if so, the
thread that calls queue.offer() should re-throw that exception. This is exactly
what happens after this thread has finished calling queue.offer() but as you
are seeing, I think we need to also do it if we are unable to
add events to the queue.
I will work on getting a PR in that addresses this issue. Do you agree that
this makes sense?
> PersistentProvenanceRepository Deadlocks caused by a blocked journal merge
> --------------------------------------------------------------------------
>
> Key: NIFI-2395
> URL: https://issues.apache.org/jira/browse/NIFI-2395
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 0.6.0, 0.7.0
> Reporter: Brian Davis
> Assignee: Joseph Witt
> Priority: Blocker
>
> I have a nifi instance that I have been running for about a week and has
> deadlocked at least 3 times during this time. When I say deadlock the whole
> nifi instance stops doing any progress on flowfiles. I looked at the stack
> trace and there are a lot of threads stuck doing tasks in the
> PersistentProvenanceRepository. Looking at the code I think this is what is
> happening:
> There is a ReadWriteLock that all the reads are waiting for a write. The
> write is in the loop:
> {code}
> while (journalFileCount > journalCountThreshold || repoSize >
> sizeThreshold) {
> // if a shutdown happens while we are in this loop, kill
> the rollover thread and break
> if (this.closed.get()) {
> if (future != null) {
> future.cancel(true);
> }
> break;
> }
> if (repoSize > sizeThreshold) {
> logger.debug("Provenance Repository has exceeded its
> size threshold; will trigger purging of oldest events");
> purgeOldEvents();
> journalFileCount = getJournalCount();
> repoSize = getSize(getLogFiles(), 0L);
> continue;
> } else {
> // if we are constrained by the number of journal
> files rather than the size of the repo,
> // then we will just sleep a bit because another
> thread is already actively merging the journals,
> // due to the runnable that we scheduled above
> try {
> Thread.sleep(100L);
> } catch (final InterruptedException ie) {
> }
> }
> logger.debug("Provenance Repository is still behind.
> Keeping flow slowed down "
> + "to accommodate. Currently, there are {}
> journal files ({} bytes) and "
> + "threshold for blocking is {} ({} bytes)",
> journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
> journalFileCount = getJournalCount();
> repoSize = getSize(getLogFiles(), 0L);
> }
> logger.info("Provenance Repository has now caught up with
> rolling over journal files. Current number of "
> + "journal files to be rolled over is {}",
> journalFileCount);
> }
> {code}
> My nifi is at the sleep indefinitely. The reason my nifi cannot move forward
> is because of the thread doing the merge is stopped. The thread doing the
> merge is at:
> {code}
> accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10,
> TimeUnit.MILLISECONDS);
> {code}
> so the queue is full.
> What I believe happened is that the callables created here:
> {code}
> final Callable<Object> callable = new
> Callable<Object>() {
> @Override
> public Object call() throws IOException {
> while (!eventQueue.isEmpty() ||
> !finishedAdding.get()) {
> final
> Tuple<StandardProvenanceEventRecord, Integer> tuple;
> try {
> tuple = eventQueue.poll(10,
> TimeUnit.MILLISECONDS);
> } catch (final InterruptedException
> ie) {
> continue;
> }
> if (tuple == null) {
> continue;
> }
> indexingAction.index(tuple.getKey(),
> indexWriter, tuple.getValue());
> }
> return null;
> }
> {code}
> finish before the offer adds its first event because I do not see any Index
> Provenance Events threads. My guess is the while loop condition is wrong and
> should be && instead of ||.
> I upped the thread count for the index creation from 1 to 3 to see if that
> helps. I can tell you if that helps later this week.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)