[ 
https://issues.apache.org/jira/browse/NIFI-15244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ofek Rotem updated NIFI-15244:
------------------------------
    Description: 
Hi team, I'm facing a major issue in my NiFi instance in which when the current 
journal file becomes damaged, every time NiFi will try to update the journal 
file and rollback - it ends up adding 3 additional flowfiles to the original 
queue

This happens in both NiFi v2.3.0 and NiFi version 1.28.1

+Here's the flow of my problem:+ # I'm deploying a NiFi cluster with (versions 
1.28.1 or 2.3.0) on k8s using the k8s nodes storage to store my data.
 # I'm filling up my NiFi cluster by streaming a lot of flowfiles into the NiFi 
cluster
 # The PVC storing the flowfile repository fills up to 100%
 # The next time NiFi tries to checkpoint it's flowfiles from RAM to DISK, when 
trying to write a header to the journal file:

(Under *SequentialAccessWriteAheadLog.java* class, inside the *checkpoint* 
function)
[https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]in/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java]
{code:java}
File journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
while (journalFile.exists()) {
nextTransactionId++;
journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
}
journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 
nextTransactionId);
journal.writeHeader();
{code}

 
NiFi encounters an error:
(Under *LengthDelimitedJournal.java* class, inside the *writeHeader* function)
[https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java]
{code:java}
outStream.flush(); // From what i understand this part flushes the changes to 
the disk and right there NiFi fails
} catch (final Throwable t) {
poison(t);
final IOException ioe = (t instanceof IOException) ? (IOException) t : new 
IOException("Failed to create journal file " + journalFile, t);
logger.error("Failed to create new journal file {}", journalFile, ioe);
throw ioe;
}
headerWritten = true;
{code}
 

Since NiFi throws an error, the *{{headerWritten}}* boolean flag remains false 
for that journal file from now on.I assume the part in which NiFi encounters an 
error when trying to write the header is expected
(since the PVC aka DISK is full, and thus even these small number of bytes can 
not be written to disk)

The weird part is what follows:
+When running a processor at this state (at which the journal file is damaged), 
the following chain occurs:+ # Inside the {{onTrigger}} function, 
{{session.commitAsync}} function is called (inside the {{AbstractProcessor}} 
class)
 # The code than calls the {{checkpoint}} method inside the same class
 # The code calls the {{commit}} function which will try to update the flowfile 
repository at disk
 # The commit tries to update the flowfile repository (line number 597)
 # The update failes (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since ** the header was not previously written 
to the journal file)
 # The code proceeds to the {{catch}} block at line 730, since the function 
threw an expection (the following catch block at line 598 is not called since 
the error is not an {*}+IOExpection+{*})
 # The {{rollback}} function is called (line 737)
 # The rollback function calls the {{rollbackRecord}} function (line
 # The {{rollbackRecord}} function adds the flowfile back the original 
queue{*}_. NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 1_{*}
 # It than tries to update the flowfile repository once again (line 1278)
 # The update failes (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since the header was not previously written to 
the journal file)
 # The code proceeds to line 743 and throws a *+RunTimeException+*
 # This time the error is cought inside the {{commitAsync}} function, mentioned 
at bullet number 1
 # Another {{rollback}} is called (line
 # The {{rollback}} function calls the {{rollbackRecord}} function (line
 # The {{rollbackRecord}} function adds the flowfile back the original queue. 
*_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 2_*
 # It than tries to update the flowfile repository once again (line 1278)
 # The update fails (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since the header was not previously written to 
the journal file)
 # The code proceeds to line 743 and throws a *+RunTimeException+*
 # This time the error is caught inside the catch block at the {{OnTrigger}} 
function, mentioned at bullet number 1
 # Another {{rollback}} is called (line
 # The {{rollback}} function calls the {{rollbackRecord}} function (line
 # The {{rollbackRecord}} function adds the flowfile back the original queue. 
*_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 3_*
 # It than tries to update the flowfile repository once again (line 1278)
 # The update fails (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since the header was not previously written to 
the journal file)
 # The code proceeds to line 743 and throws a *+RunTimeException+*

*This time nothing catches this error, and the error is shown on screen.*

As demonstrated above, the behavior that I'm observing is that in the event 
that the journal file is damaged, every time a processor will try to act on a 
flowfile, it would end up adding *three* additional copies of the file to the 
original queue.

Which is of course a very *concerning* and *unexpected* behavior.

I wrote about that on the slack channel 
([https://apachenifi.slack.com/archives/C0L9VCD47/p1763595456422929]) and was 
asked to open a ticket

I think that even long term the solution *must be* to {*}increase the 
repository size{*}, enabling checkpoint to happen.

The single and currently only behavior I would like to eliminate is that:

*_If the journal file becomes damaged, every rollback function would result in 
adding new, duplicated_* *_flowfiles to the original queue_*

This is the only unexpected part for me, and for that I think that a simple 
change in the {{wal.update}} function is needed. 

  was:
Hi team, I'm facing a major issue in my NiFi instance in which when the current 
journal file becomes damaged, every time NiFi will try to update the journal 
file and rollback - it ends up adding 3 additional flowfiles to the original 
queue

This happens in both NiFi v2.3.0 and NiFi version 1.28.1

+Here's the flow of my problem:+ # I'm deploying a NiFi cluster with (versions 
1.28.1 or 2.3.0) on k8s using the k8s nodes storage to store my data.
 # I'm filling up my NiFi cluster by streaming a lot of flowfiles into the NiFi 
cluster
 # The PVC storing the flowfile repository fills up to 100%
 # The next time NiFi tries to checkpoint it's flowfiles from RAM to DISK, when 
trying to write a header to the journal file:

(Under *SequentialAccessWriteAheadLog.java* class, inside the *checkpoint* 
function)
[https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]in/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java]
File journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
            while (journalFile.exists()) \{
                nextTransactionId++;
                journalFile = new File(journalsDirectory, nextTransactionId + 
".journal");
            }

            journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, 
streamPool, nextTransactionId);
            journal.writeHeader();
 
NiFi encounters an error:
(Under *LengthDelimitedJournal.java* class, inside the *writeHeader* function)
[https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java]
utStream.flush(); // From what i understand this part flushes the changes to 
the disk and right there NiFi fails
        } catch (final Throwable t) \{
            poison(t);

            final IOException ioe = (t instanceof IOException) ? (IOException) 
t : new IOException("Failed to create journal file " + journalFile, t);
            logger.error("Failed to create new journal file {}", journalFile, 
ioe);
            throw ioe;
        }

        headerWritten = true;

Since NiFi throws an error, the *{{headerWritten}}* boolean flag remains false 
for that journal file from now on.I assume the part in which NiFi encounters an 
error when trying to write the header is expected
(since the PVC aka DISK is full, and thus even these small number of bytes can 
not be written to disk)

The weird part is what follows:
+When running a processor at this state (at which the journal file is damaged), 
the following chain occurs:+ # Inside the {{onTrigger}} function, 
{{session.commitAsync}} function is called (inside the {{AbstractProcessor}} 
class)
 # The code than calls the {{checkpoint}} method inside the same class
 # The code calls the {{commit}} function which will try to update the flowfile 
repository at disk
 # The commit tries to update the flowfile repository (line number 597)
 # The update failes (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since ** the header was not previously written 
to the journal file)
 # The code proceeds to the {{catch}} block at line 730, since the function 
threw an expection (the following catch block at line 598 is not called since 
the error is not an {*}+IOExpection+{*})
 # The {{rollback}} function is called (line 737)
 # The rollback function calls the {{rollbackRecord}} function (line
 # The {{rollbackRecord}} function adds the flowfile back the original 
queue{*}_. NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 1_{*}
 # It than tries to update the flowfile repository once again (line 1278)
 # The update failes (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since the header was not previously written to 
the journal file)
 # The code proceeds to line 743 and throws a *+RunTimeException+*
 # This time the error is cought inside the {{commitAsync}} function, mentioned 
at bullet number 1
 # Another {{rollback}} is called (line
 # The {{rollback}} function calls the {{rollbackRecord}} function (line
 # The {{rollbackRecord}} function adds the flowfile back the original queue. 
*_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 2_*
 # It than tries to update the flowfile repository once again (line 1278)
 # The update fails (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since the header was not previously written to 
the journal file)
 # The code proceeds to line 743 and throws a *+RunTimeException+*
 # This time the error is caught inside the catch block at the {{OnTrigger}} 
function, mentioned at bullet number 1
 # Another {{rollback}} is called (line
 # The {{rollback}} function calls the {{rollbackRecord}} function (line
 # The {{rollbackRecord}} function adds the flowfile back the original queue. 
*_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 3_*
 # It than tries to update the flowfile repository once again (line 1278)
 # The update fails (the update function to the journal file throws an 
{*}+illegalStateExpection+{*}, since the header was not previously written to 
the journal file)
 # The code proceeds to line 743 and throws a *+RunTimeException+*


*This time nothing catches this error, and the error is shown on screen.*

As demonstrated above, the behavior that I'm observing is that in the event 
that the journal file is damaged, every time a processor will try to act on a 
flowfile, it would end up adding *three* additional copies of the file to the 
original queue.

Which is of course a very *concerning* and *unexpected* behavior.

I wrote about that on the slack channel 
(https://apachenifi.slack.com/archives/C0L9VCD47/p1763595456422929) and was 
asked to open a ticket

I think that even long term the solution *must be* to {*}increase the 
repository size{*}, enabling checkpoint to happen. 

The single and currently only behavior I would like to eliminate is that:

*_If the journal file becomes damaged, every rollback function would result in 
adding new, duplicated_* *_flowfiles to the original queue_* 

This is the only unexpected part for me, and for that I think that a simple 
change in the {{wal.update}} function is needed. 


> Upon damaged journal file, NiFi adds 3 more flowfiles to the queues
> -------------------------------------------------------------------
>
>                 Key: NIFI-15244
>                 URL: https://issues.apache.org/jira/browse/NIFI-15244
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.28.1, 2.3.0
>            Reporter: Ofek Rotem
>            Priority: Major
>
> Hi team, I'm facing a major issue in my NiFi instance in which when the 
> current journal file becomes damaged, every time NiFi will try to update the 
> journal file and rollback - it ends up adding 3 additional flowfiles to the 
> original queue
> This happens in both NiFi v2.3.0 and NiFi version 1.28.1
> +Here's the flow of my problem:+ # I'm deploying a NiFi cluster with 
> (versions 1.28.1 or 2.3.0) on k8s using the k8s nodes storage to store my 
> data.
>  # I'm filling up my NiFi cluster by streaming a lot of flowfiles into the 
> NiFi cluster
>  # The PVC storing the flowfile repository fills up to 100%
>  # The next time NiFi tries to checkpoint it's flowfiles from RAM to DISK, 
> when trying to write a header to the journal file:
> (Under *SequentialAccessWriteAheadLog.java* class, inside the *checkpoint* 
> function)
> [https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]in/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java]
> {code:java}
> File journalFile = new File(journalsDirectory, nextTransactionId + 
> ".journal");
> while (journalFile.exists()) {
> nextTransactionId++;
> journalFile = new File(journalsDirectory, nextTransactionId + ".journal");
> }
> journal = new LengthDelimitedJournal<>(journalFile, serdeFactory, streamPool, 
> nextTransactionId);
> journal.writeHeader();
> {code}
>  
> NiFi encounters an error:
> (Under *LengthDelimitedJournal.java* class, inside the *writeHeader* function)
> [https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-[…]/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java|https://github.com/apache/nifi/blob/ae45d9eb24da76b166f323433a9b91d039caa50d/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java]
> {code:java}
> outStream.flush(); // From what i understand this part flushes the changes to 
> the disk and right there NiFi fails
> } catch (final Throwable t) {
> poison(t);
> final IOException ioe = (t instanceof IOException) ? (IOException) t : new 
> IOException("Failed to create journal file " + journalFile, t);
> logger.error("Failed to create new journal file {}", journalFile, ioe);
> throw ioe;
> }
> headerWritten = true;
> {code}
>  
> Since NiFi throws an error, the *{{headerWritten}}* boolean flag remains 
> false for that journal file from now on.I assume the part in which NiFi 
> encounters an error when trying to write the header is expected
> (since the PVC aka DISK is full, and thus even these small number of bytes 
> can not be written to disk)
> The weird part is what follows:
> +When running a processor at this state (at which the journal file is 
> damaged), the following chain occurs:+ # Inside the {{onTrigger}} function, 
> {{session.commitAsync}} function is called (inside the {{AbstractProcessor}} 
> class)
>  # The code than calls the {{checkpoint}} method inside the same class
>  # The code calls the {{commit}} function which will try to update the 
> flowfile repository at disk
>  # The commit tries to update the flowfile repository (line number 597)
>  # The update failes (the update function to the journal file throws an 
> {*}+illegalStateExpection+{*}, since ** the header was not previously written 
> to the journal file)
>  # The code proceeds to the {{catch}} block at line 730, since the function 
> threw an expection (the following catch block at line 598 is not called since 
> the error is not an {*}+IOExpection+{*})
>  # The {{rollback}} function is called (line 737)
>  # The rollback function calls the {{rollbackRecord}} function (line
>  # The {{rollbackRecord}} function adds the flowfile back the original 
> queue{*}_. NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 1_{*}
>  # It than tries to update the flowfile repository once again (line 1278)
>  # The update failes (the update function to the journal file throws an 
> {*}+illegalStateExpection+{*}, since the header was not previously written to 
> the journal file)
>  # The code proceeds to line 743 and throws a *+RunTimeException+*
>  # This time the error is cought inside the {{commitAsync}} function, 
> mentioned at bullet number 1
>  # Another {{rollback}} is called (line
>  # The {{rollback}} function calls the {{rollbackRecord}} function (line
>  # The {{rollbackRecord}} function adds the flowfile back the original queue. 
> *_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 2_*
>  # It than tries to update the flowfile repository once again (line 1278)
>  # The update fails (the update function to the journal file throws an 
> {*}+illegalStateExpection+{*}, since the header was not previously written to 
> the journal file)
>  # The code proceeds to line 743 and throws a *+RunTimeException+*
>  # This time the error is caught inside the catch block at the {{OnTrigger}} 
> function, mentioned at bullet number 1
>  # Another {{rollback}} is called (line
>  # The {{rollback}} function calls the {{rollbackRecord}} function (line
>  # The {{rollbackRecord}} function adds the flowfile back the original queue. 
> *_NUMBER OF FLOWFILES ADDED BACK TO THE QUEUE = 3_*
>  # It than tries to update the flowfile repository once again (line 1278)
>  # The update fails (the update function to the journal file throws an 
> {*}+illegalStateExpection+{*}, since the header was not previously written to 
> the journal file)
>  # The code proceeds to line 743 and throws a *+RunTimeException+*
> *This time nothing catches this error, and the error is shown on screen.*
> As demonstrated above, the behavior that I'm observing is that in the event 
> that the journal file is damaged, every time a processor will try to act on a 
> flowfile, it would end up adding *three* additional copies of the file to the 
> original queue.
> Which is of course a very *concerning* and *unexpected* behavior.
> I wrote about that on the slack channel 
> ([https://apachenifi.slack.com/archives/C0L9VCD47/p1763595456422929]) and was 
> asked to open a ticket
> I think that even long term the solution *must be* to {*}increase the 
> repository size{*}, enabling checkpoint to happen.
> The single and currently only behavior I would like to eliminate is that:
> *_If the journal file becomes damaged, every rollback function would result 
> in adding new, duplicated_* *_flowfiles to the original queue_*
> This is the only unexpected part for me, and for that I think that a simple 
> change in the {{wal.update}} function is needed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to