[ 
https://issues.apache.org/jira/browse/NIFI-5996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Payne updated NIFI-5996:
-----------------------------
    Description: 
The suggestion that I am laying out here may result in more complex code that 
is even slower than the current implementation. Or it may result in much better 
performance. Is worth spending some time to explore the approach. Taking notes 
here so that we can look at when we have a chance to explore changing the 
serialization implementation.

A very quick update to the FlowFile Repo indicates we could get 4x better 
performance, perhaps more. However, this does not properly handle all error 
conditions, etc. Was just a quick test to see if the idea is worth exploring.

Currently, the way that both of these repositories work is that multiple 
threads can enter the repo in order to write out 'events' to the repo. Each 
thread will then use a serializer to serialize the List of events into a 
byte[]. We then have to enter a synchronized block and within the synchronized 
block obtain an identifier (in case of FlowFile Repo this is a Transaction ID; 
in case of Provenance Repo this is an Event ID). In both cases, the Identifier 
is a one-up number. After we have incremented a counter, we can write the 
byte[] to a FileOutputStream. This is done perhaps using a 
BufferedOutputStream, but the BufferedOutputStream must be flushed before we 
can return from the method, in order to ensure that we throw an Exception if we 
fail to perform the entire write. And the write+flush have to be done within 
the synchronized block to ensure that we atomically obtain a one-up ID and then 
write the event(s) with that one-up id without any other threads updating the 
stream in between.

When we perform many small updates, this becomes a bottleneck, because we have 
to enter synchronized blocks with other threads potentially resulting in 
contention.

The alternate approach that I am suggesting is that instead of entering a 
'synchronized' block to get the one-up number and then write out to the 
FileOutputStream, we can instead take that byte[] and put it on a BlockingQueue 
along with a CompletableFuture. We would then have a background thread whose 
job it is to poll that Queue using a blocking poll. As soon as it obtains a 
byte[] from that queue, it would obtain the one-up id, write out the one-up id, 
write out the byte[], and then call queue.poll() without blocking to see if 
there is anything on it. If so, repeat, until either the queue is empty or we 
have written some number of events (say 10). Once this background thread has 
written out 10 events or so, it can complete each of those completable futures 
(or if there is an Exception, completeExceptionally). To ensure that we don't 
write out some of the events but not all, we'd want to ensure that all of those 
events are written as a transaction (may require introducing such a concept to 
prov repo).

By using this approach, we can write 10 events, each coming from a different 
thread potentially, before having to flush the contents to disk. Since these 
events may be much smaller than a single block on disk, we may even fit 10 
events in a single disk block, giving a 10x improvement in throughput.

Also, of note, the call to `CompletableFuture.complete(null)` would be very 
expensive. Probably more expensive than writing the serialized events out to 
disk. So worth exploring other solutions there. A `BlockingQueue<Throwable>` 
that holds a failure or `null` for success, etc. Or potentially have one thread 
that is responsible for writing the data/flushing and another 1+ threads 
responsible for calling `CompletableFuture.complete` etc.

An additional improvement to the way that FlowFile Repo serializes data is to 
consider making a 'SWAP OUT' be written much more efficiently. Currently, it 
writes out an event for every FlowFile that is swapped out. This could be 
reduced to a single event that includes the ID's of all FlowFiles.

  was:
The suggestion that I am laying out here may result in more complex code that 
is even slower than the current implementation. Or it may result in much better 
performance. Is worth spending some time to explore the approach. Taking notes 
here so that we can look at when we have a chance to explore changing the 
serialization implementation.

A very quick update to the FlowFile Repo indicates we could get 4x better 
performance, perhaps more. However, this does not properly handle all error 
conditions, etc. Was just a quick test to see if the idea is worth exploring.

Currently, the way that both of these repositories work is that multiple 
threads can enter the repo in order to write out 'events' to the repo. Each 
thread will then use a serializer to serialize the List of events into a 
byte[]. We then have to enter a synchronized block and within the synchronized 
block obtain an identifier (in case of FlowFile Repo this is a Transaction ID; 
in case of Provenance Repo this is an Event ID). In both cases, the Identifier 
is a one-up number. After we have incremented a counter, we can write the 
byte[] to a FileOutputStream. This is done perhaps using a 
BufferedOutputStream, but the BufferedOutputStream must be flushed before we 
can return from the method, in order to ensure that we throw an Exception if we 
fail to perform the entire write. And the write+flush have to be done within 
the synchronized block to ensure that we atomically obtain a one-up ID and then 
write the event(s) with that one-up id without any other threads updating the 
stream in between.

When we perform many small updates, this becomes a bottleneck, because we have 
to enter synchronized blocks with other threads potentially resulting in 
contention.

The alternate approach that I am suggesting is that instead of entering a 
'synchronized' block to get the one-up number and then write out to the 
FileOutputStream, we can instead take that byte[] and put it on a BlockingQueue 
along with a CompletableFuture. We would then have a background thread whose 
job it is to poll that Queue using a blocking poll. As soon as it obtains a 
byte[] from that queue, it would obtain the one-up id, write out the one-up id, 
write out the byte[], and then call queue.poll() without blocking to see if 
there is anything on it. If so, repeat, until either the queue is empty or we 
have written some number of events (say 10). Once this background thread has 
written out 10 events or so, it can complete each of those completable futures 
(or if there is an Exception, completeExceptionally). To ensure that we don't 
write out some of the events but not all, we'd want to ensure that all of those 
events are written as a transaction (may require introducing such a concept to 
prov repo).

By using this approach, we can write 10 events, each coming from a different 
thread potentially, before having to flush the contents to disk. Since these 
events may be much smaller than a single block on disk, we may even fit 10 
events in a single disk block, giving a 10x improvement in throughput.

Also, of note, the call to `CompletableFuture.complete(null)` would be very 
expensive. Probably more expensive than writing the serialized events out to 
disk. So worth exploring other solutions there. A `BlockingQueue<Throwable>` 
that holds a failure or `null` for success, etc. Or potentially have one thread 
that is responsible for writing the data/flushing and another 1+ threads 
responsible for calling `CompletableFuture.complete` etc.


> Consider slight modification to Provenance & FlowFile Repo serialization 
> logic for performance improvement
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-5996
>                 URL: https://issues.apache.org/jira/browse/NIFI-5996
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> The suggestion that I am laying out here may result in more complex code that 
> is even slower than the current implementation. Or it may result in much 
> better performance. Is worth spending some time to explore the approach. 
> Taking notes here so that we can look at when we have a chance to explore 
> changing the serialization implementation.
> A very quick update to the FlowFile Repo indicates we could get 4x better 
> performance, perhaps more. However, this does not properly handle all error 
> conditions, etc. Was just a quick test to see if the idea is worth exploring.
> Currently, the way that both of these repositories work is that multiple 
> threads can enter the repo in order to write out 'events' to the repo. Each 
> thread will then use a serializer to serialize the List of events into a 
> byte[]. We then have to enter a synchronized block and within the 
> synchronized block obtain an identifier (in case of FlowFile Repo this is a 
> Transaction ID; in case of Provenance Repo this is an Event ID). In both 
> cases, the Identifier is a one-up number. After we have incremented a 
> counter, we can write the byte[] to a FileOutputStream. This is done perhaps 
> using a BufferedOutputStream, but the BufferedOutputStream must be flushed 
> before we can return from the method, in order to ensure that we throw an 
> Exception if we fail to perform the entire write. And the write+flush have to 
> be done within the synchronized block to ensure that we atomically obtain a 
> one-up ID and then write the event(s) with that one-up id without any other 
> threads updating the stream in between.
> When we perform many small updates, this becomes a bottleneck, because we 
> have to enter synchronized blocks with other threads potentially resulting in 
> contention.
> The alternate approach that I am suggesting is that instead of entering a 
> 'synchronized' block to get the one-up number and then write out to the 
> FileOutputStream, we can instead take that byte[] and put it on a 
> BlockingQueue along with a CompletableFuture. We would then have a background 
> thread whose job it is to poll that Queue using a blocking poll. As soon as 
> it obtains a byte[] from that queue, it would obtain the one-up id, write out 
> the one-up id, write out the byte[], and then call queue.poll() without 
> blocking to see if there is anything on it. If so, repeat, until either the 
> queue is empty or we have written some number of events (say 10). Once this 
> background thread has written out 10 events or so, it can complete each of 
> those completable futures (or if there is an Exception, 
> completeExceptionally). To ensure that we don't write out some of the events 
> but not all, we'd want to ensure that all of those events are written as a 
> transaction (may require introducing such a concept to prov repo).
> By using this approach, we can write 10 events, each coming from a different 
> thread potentially, before having to flush the contents to disk. Since these 
> events may be much smaller than a single block on disk, we may even fit 10 
> events in a single disk block, giving a 10x improvement in throughput.
> Also, of note, the call to `CompletableFuture.complete(null)` would be very 
> expensive. Probably more expensive than writing the serialized events out to 
> disk. So worth exploring other solutions there. A `BlockingQueue<Throwable>` 
> that holds a failure or `null` for success, etc. Or potentially have one 
> thread that is responsible for writing the data/flushing and another 1+ 
> threads responsible for calling `CompletableFuture.complete` etc.
> An additional improvement to the way that FlowFile Repo serializes data is to 
> consider making a 'SWAP OUT' be written much more efficiently. Currently, it 
> writes out an event for every FlowFile that is swapped out. This could be 
> reduced to a single event that includes the ID's of all FlowFiles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to