[ 
https://issues.apache.org/jira/browse/NIFI-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15398619#comment-15398619
 ] 

ASF GitHub Bot commented on NIFI-2395:
--------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/734#discussion_r72735096
  
    --- Diff: 
nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
 ---
    @@ -1668,24 +1669,32 @@ public Thread newThread(final Runnable r) {
                             }
                         });
     
    +                    final AtomicInteger indexingFailureCount = new 
AtomicInteger(0);
                         try {
                             for (int i = 0; i < 
configuration.getIndexThreadPoolSize(); i++) {
                                 final Callable<Object> callable = new 
Callable<Object>() {
                                     @Override
                                     public Object call() throws IOException {
                                         while (!eventQueue.isEmpty() || 
!finishedAdding.get()) {
    -                                        final 
Tuple<StandardProvenanceEventRecord, Integer> tuple;
                                             try {
    -                                            tuple = eventQueue.poll(10, 
TimeUnit.MILLISECONDS);
    -                                        } catch (final 
InterruptedException ie) {
    -                                            continue;
    +                                            final 
Tuple<StandardProvenanceEventRecord, Integer> tuple;
    +                                            try {
    +                                                tuple = 
eventQueue.poll(10, TimeUnit.MILLISECONDS);
    +                                            } catch (final 
InterruptedException ie) {
    +                                                continue;
    --- End diff --
    
    @markap14  we want a "Thread.currentThread().interrupt();" here as well yes?


> PersistentProvenanceRepository Deadlocks caused by a blocked journal merge
> --------------------------------------------------------------------------
>
>                 Key: NIFI-2395
>                 URL: https://issues.apache.org/jira/browse/NIFI-2395
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 0.6.0, 0.7.0
>            Reporter: Brian Davis
>            Assignee: Joseph Witt
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> I have a nifi instance that I have been running for about a week and has 
> deadlocked at least 3 times during this time.  When I say deadlock the whole 
> nifi instance stops doing any progress on flowfiles.  I looked at the stack 
> trace and there are a lot of threads stuck doing tasks in the 
> PersistentProvenanceRepository.  Looking at the code I think this is what is 
> happening:
> There is a ReadWriteLock that all the reads are waiting for a write.  The 
> write is in the loop:
> {code}
>                 while (journalFileCount > journalCountThreshold || repoSize > 
> sizeThreshold) {
>                     // if a shutdown happens while we are in this loop, kill 
> the rollover thread and break
>                     if (this.closed.get()) {
>                         if (future != null) {
>                             future.cancel(true);
>                         }
>                         break;
>                     }
>                     if (repoSize > sizeThreshold) {
>                         logger.debug("Provenance Repository has exceeded its 
> size threshold; will trigger purging of oldest events");
>                         purgeOldEvents();
>                         journalFileCount = getJournalCount();
>                         repoSize = getSize(getLogFiles(), 0L);
>                         continue;
>                     } else {
>                         // if we are constrained by the number of journal 
> files rather than the size of the repo,
>                         // then we will just sleep a bit because another 
> thread is already actively merging the journals,
>                         // due to the runnable that we scheduled above
>                         try {
>                             Thread.sleep(100L);
>                         } catch (final InterruptedException ie) {
>                         }
>                     }
>                     logger.debug("Provenance Repository is still behind. 
> Keeping flow slowed down "
>                             + "to accommodate. Currently, there are {} 
> journal files ({} bytes) and "
>                             + "threshold for blocking is {} ({} bytes)", 
> journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
>                     journalFileCount = getJournalCount();
>                     repoSize = getSize(getLogFiles(), 0L);
>                 }
>                 logger.info("Provenance Repository has now caught up with 
> rolling over journal files. Current number of "
>                         + "journal files to be rolled over is {}", 
> journalFileCount);
>             }
> {code}
> My nifi is at the sleep indefinitely.  The reason my nifi cannot move forward 
> is because of the thread doing the merge is stopped.  The thread doing the 
> merge is at:
> {code}
> accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, 
> TimeUnit.MILLISECONDS);
> {code}
> so the queue is full.  
> What I believe happened is that the callables created here:
> {code}
>                             final Callable<Object> callable = new 
> Callable<Object>() {
>                                 @Override
>                                 public Object call() throws IOException {
>                                     while (!eventQueue.isEmpty() || 
> !finishedAdding.get()) {
>                                         final 
> Tuple<StandardProvenanceEventRecord, Integer> tuple;
>                                         try {
>                                             tuple = eventQueue.poll(10, 
> TimeUnit.MILLISECONDS);
>                                         } catch (final InterruptedException 
> ie) {
>                                             continue;
>                                         }
>                                         if (tuple == null) {
>                                             continue;
>                                         }
>                                         indexingAction.index(tuple.getKey(), 
> indexWriter, tuple.getValue());
>                                     }
>                                     return null;
>                                 }
> {code}
> finish before the offer adds its first event because I do not see any Index 
> Provenance Events threads.  My guess is the while loop condition is wrong and 
> should be && instead of ||.
> I upped the thread count for the index creation from 1 to 3 to see if that 
> helps.  I can tell you if that helps later this week.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to