[
https://issues.apache.org/jira/browse/NIFI-15244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18047221#comment-18047221
]
ASF subversion and git services commented on NIFI-15244:
--------------------------------------------------------
Commit 056c16a38300af51463ae1188576009cf2f54ba8 in nifi's branch
refs/heads/main from Ofek Rotem
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=056c16a383 ]
NIFI-15244 Adjusting update logic to prevent rollback from duplicating
FlowFiles upon journal update failure
Signed-off-by: Pierre Villard <[email protected]>
This closes #10677.
> Upon damaged journal file, NiFi adds 3 more flowfiles to the queues
> -------------------------------------------------------------------
>
> Key: NIFI-15244
> URL: https://issues.apache.org/jira/browse/NIFI-15244
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 1.28.1, 2.3.0
> Reporter: Ofek Rotem
> Assignee: Ofek Rotem
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Hi team, I'm facing a major issue in my NiFi instance in which when the
> current journal file becomes damaged, every time NiFi will try to update the
> journal file and rollback - it ends up adding 3 additional flowfiles to the
> original queue
> This happens in both NiFi v2.3.0 and NiFi version 1.28.1
> +Here's the flow of my problem:+ # I'm deploying a NiFi cluster with
> (versions 1.28.1 or 2.3.0) on k8s using the k8s nodes storage to store my
> data.
> # I'm filling up my NiFi cluster by streaming a lot of flowfiles into the
> NiFi cluster
> # The PVC storing the flowfile repository fills up to 100%
> # The next time NiFi tries to checkpoint it's flowfiles from RAM to DISK,
> when trying to write a header to the journal file:
> (Under *SequentialAccessWriteAheadLog.java* class, inside the *checkpoint*
> function)
> [https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]in/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java]
> {code:java}
> File journalFile = new File(journalsDirectory, nextTransactionId +
> ".journal");
> while (journalFile.exists()) {
> nextTransactionId++;
> journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
> }
> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool,
> nextTransactionId);
> journal.writeHeader();
> {code}
>
> NiFi encounters an error:
> (Under *LengthDelimitedJournal.java* class, inside the *writeHeader* function)
> [https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java]
> {code:java}
> outStream.flush(); // From what i understand this part flushes the changes to
> the disk and right there NiFi fails
> } catch (final Throwable t) {
> poison(t);
> final IOException ioe = (t instanceof IOException) ? (IOException) t : new
> IOException("Failed to create journal file " + journalFile, t);
> logger.error("Failed to create new journal file {}", journalFile, ioe);
> throw ioe;
> }
> headerWritten = true;
> {code}
>
> Since NiFi throws an error, the *{{headerWritten}}* boolean flag remains
> false for that journal file from now on.I assume the part in which NiFi
> encounters an error when trying to write the header is expected
> (since the PVC aka DISK is full, and thus even these small number of bytes
> can not be written to disk)
> The weird part is what follows:
> +When running a processor at this state (at which the journal file is
> damaged), the following chain occurs:+ # Inside the {{onTrigger}} function,
> {{session.commitAsync}} function is called (inside the {{AbstractProcessor}}
> class)
> # The code than calls the {{checkpoint}} method inside the same class
> # The code calls the {{commit}} function which will try to update the
> flowfile repository at disk
> # The commit tries to update the flowfile repository (line number 597)
> # The update failes (the update function to the journal file throws an
> {*}+illegalStateExpection+{*}, since ** the header was not previously written
> to the journal file)
> # The code proceeds to the {{catch}} block at line 730, since the function
> threw an expection (the following catch block at line 598 is not called since
> the error is not an {*}+IOExpection+{*})
> # The {{rollback}} function is called (line 737)
> # The rollback function calls the {{rollbackRecord}} function (line
> # The {{rollbackRecord}} function adds the flowfile back the original
> queue{*}_. NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 1_{*}
> # It than tries to update the flowfile repository once again (line 1278)
> # The update failes (the update function to the journal file throws an
> {*}+illegalStateExpection+{*}, since the header was not previously written to
> the journal file)
> # The code proceeds to line 743 and throws a *+RunTimeException+*
> # This time the error is cought inside the {{commitAsync}} function,
> mentioned at bullet number 1
> # Another {{rollback}} is called (line
> # The {{rollback}} function calls the {{rollbackRecord}} function (line
> # The {{rollbackRecord}} function adds the flowfile back the original queue.
> *_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 2_*
> # It than tries to update the flowfile repository once again (line 1278)
> # The update fails (the update function to the journal file throws an
> {*}+illegalStateExpection+{*}, since the header was not previously written to
> the journal file)
> # The code proceeds to line 743 and throws a *+RunTimeException+*
> # This time the error is caught inside the catch block at the {{OnTrigger}}
> function, mentioned at bullet number 1
> # Another {{rollback}} is called (line
> # The {{rollback}} function calls the {{rollbackRecord}} function (line
> # The {{rollbackRecord}} function adds the flowfile back the original queue.
> *_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 3_*
> # It than tries to update the flowfile repository once again (line 1278)
> # The update fails (the update function to the journal file throws an
> {*}+illegalStateExpection+{*}, since the header was not previously written to
> the journal file)
> # The code proceeds to line 743 and throws a *+RunTimeException+*
> *This time nothing catches this error, and the error is shown on screen.*
> As demonstrated above, the behavior that I'm observing is that in the event
> that the journal file is damaged, every time a processor will try to act on a
> flowfile, it would end up adding *three* additional copies of the file to the
> original queue.
> Which is of course a very *concerning* and *unexpected* behavior.
> I wrote about that on the slack channel
> ([https://apachenifi.slack.com/archives/C0L9VCD47/p1763595456422929]) and was
> asked to open a ticket
> I think that even long term the solution *must be* to {*}increase the
> repository size{*}, enabling checkpoint to happen.
> The single and currently only behavior I would like to eliminate is that:
> *_If the journal file becomes damaged, every rollback function would result
> in adding new, duplicated_* *_flowfiles to the original queue_*
> This is the only unexpected part for me, and for that I think that a simple
> change in the {{wal.update}} function is needed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)