[
https://issues.apache.org/jira/browse/NIFI-5996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Payne updated NIFI-5996:
-----------------------------
Description:
The suggestion that I am laying out here may result in more complex code that
is even slower than the current implementation. Or it may result in much better
performance. Is worth spending some time to explore the approach. Taking notes
here so that we can look at when we have a chance to explore changing the
serialization implementation.
A very quick update to the FlowFile Repo indicates we could get 4x better
performance, perhaps more. However, this does not properly handle all error
conditions, etc. Was just a quick test to see if the idea is worth exploring.
Currently, the way that both of these repositories work is that multiple
threads can enter the repo in order to write out 'events' to the repo. Each
thread will then use a serializer to serialize the List of events into a
byte[]. We then have to enter a synchronized block and within the synchronized
block obtain an identifier (in case of FlowFile Repo this is a Transaction ID;
in case of Provenance Repo this is an Event ID). In both cases, the Identifier
is a one-up number. After we have incremented a counter, we can write the
byte[] to a FileOutputStream. This is done perhaps using a
BufferedOutputStream, but the BufferedOutputStream must be flushed before we
can return from the method, in order to ensure that we throw an Exception if we
fail to perform the entire write. And the write+flush have to be done within
the synchronized block to ensure that we atomically obtain a one-up ID and then
write the event(s) with that one-up id without any other threads updating the
stream in between.
When we perform many small updates, this becomes a bottleneck, because we have
to enter synchronized blocks with other threads potentially resulting in
contention.
The alternate approach that I am suggesting is that instead of entering a
'synchronized' block to get the one-up number and then write out to the
FileOutputStream, we can instead take that byte[] and put it on a BlockingQueue
along with a CompletableFuture. We would then have a background thread whose
job it is to poll that Queue using a blocking poll. As soon as it obtains a
byte[] from that queue, it would obtain the one-up id, write out the one-up id,
write out the byte[], and then call queue.poll() without blocking to see if
there is anything on it. If so, repeat, until either the queue is empty or we
have written some number of events (say 10). Once this background thread has
written out 10 events or so, it can complete each of those completable futures
(or if there is an Exception, completeExceptionally). To ensure that we don't
write out some of the events but not all, we'd want to ensure that all of those
events are written as a transaction (may require introducing such a concept to
prov repo).
By using this approach, we can write 10 events, each coming from a different
thread potentially, before having to flush the contents to disk. Since these
events may be much smaller than a single block on disk, we may even fit 10
events in a single disk block, giving a 10x improvement in throughput.
Also, of note, the call to `CompletableFuture.complete(null)` would be very
expensive. Probably more expensive than writing the serialized events out to
disk. So worth exploring other solutions there. A `BlockingQueue<Throwable>`
that holds a failure or `null` for success, etc. Or potentially have one thread
that is responsible for writing the data/flushing and another 1+ threads
responsible for calling `CompletableFuture.complete` etc.
An additional improvement to the way that FlowFile Repo serializes data is to
consider making a 'SWAP OUT' be written much more efficiently. Currently, it
writes out an event for every FlowFile that is swapped out. This could be
reduced to a single event that includes the ID's of all FlowFiles.
was:
The suggestion that I am laying out here may result in more complex code that
is even slower than the current implementation. Or it may result in much better
performance. Is worth spending some time to explore the approach. Taking notes
here so that we can look at when we have a chance to explore changing the
serialization implementation.
A very quick update to the FlowFile Repo indicates we could get 4x better
performance, perhaps more. However, this does not properly handle all error
conditions, etc. Was just a quick test to see if the idea is worth exploring.
Currently, the way that both of these repositories work is that multiple
threads can enter the repo in order to write out 'events' to the repo. Each
thread will then use a serializer to serialize the List of events into a
byte[]. We then have to enter a synchronized block and within the synchronized
block obtain an identifier (in case of FlowFile Repo this is a Transaction ID;
in case of Provenance Repo this is an Event ID). In both cases, the Identifier
is a one-up number. After we have incremented a counter, we can write the
byte[] to a FileOutputStream. This is done perhaps using a
BufferedOutputStream, but the BufferedOutputStream must be flushed before we
can return from the method, in order to ensure that we throw an Exception if we
fail to perform the entire write. And the write+flush have to be done within
the synchronized block to ensure that we atomically obtain a one-up ID and then
write the event(s) with that one-up id without any other threads updating the
stream in between.
When we perform many small updates, this becomes a bottleneck, because we have
to enter synchronized blocks with other threads potentially resulting in
contention.
The alternate approach that I am suggesting is that instead of entering a
'synchronized' block to get the one-up number and then write out to the
FileOutputStream, we can instead take that byte[] and put it on a BlockingQueue
along with a CompletableFuture. We would then have a background thread whose
job it is to poll that Queue using a blocking poll. As soon as it obtains a
byte[] from that queue, it would obtain the one-up id, write out the one-up id,
write out the byte[], and then call queue.poll() without blocking to see if
there is anything on it. If so, repeat, until either the queue is empty or we
have written some number of events (say 10). Once this background thread has
written out 10 events or so, it can complete each of those completable futures
(or if there is an Exception, completeExceptionally). To ensure that we don't
write out some of the events but not all, we'd want to ensure that all of those
events are written as a transaction (may require introducing such a concept to
prov repo).
By using this approach, we can write 10 events, each coming from a different
thread potentially, before having to flush the contents to disk. Since these
events may be much smaller than a single block on disk, we may even fit 10
events in a single disk block, giving a 10x improvement in throughput.
Also, of note, the call to `CompletableFuture.complete(null)` would be very
expensive. Probably more expensive than writing the serialized events out to
disk. So worth exploring other solutions there. A `BlockingQueue<Throwable>`
that holds a failure or `null` for success, etc. Or potentially have one thread
that is responsible for writing the data/flushing and another 1+ threads
responsible for calling `CompletableFuture.complete` etc.
> Consider slight modification to Provenance & FlowFile Repo serialization
> logic for performance improvement
> ----------------------------------------------------------------------------------------------------------
>
> Key: NIFI-5996
> URL: https://issues.apache.org/jira/browse/NIFI-5996
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Priority: Major
>
> The suggestion that I am laying out here may result in more complex code that
> is even slower than the current implementation. Or it may result in much
> better performance. Is worth spending some time to explore the approach.
> Taking notes here so that we can look at when we have a chance to explore
> changing the serialization implementation.
> A very quick update to the FlowFile Repo indicates we could get 4x better
> performance, perhaps more. However, this does not properly handle all error
> conditions, etc. Was just a quick test to see if the idea is worth exploring.
> Currently, the way that both of these repositories work is that multiple
> threads can enter the repo in order to write out 'events' to the repo. Each
> thread will then use a serializer to serialize the List of events into a
> byte[]. We then have to enter a synchronized block and within the
> synchronized block obtain an identifier (in case of FlowFile Repo this is a
> Transaction ID; in case of Provenance Repo this is an Event ID). In both
> cases, the Identifier is a one-up number. After we have incremented a
> counter, we can write the byte[] to a FileOutputStream. This is done perhaps
> using a BufferedOutputStream, but the BufferedOutputStream must be flushed
> before we can return from the method, in order to ensure that we throw an
> Exception if we fail to perform the entire write. And the write+flush have to
> be done within the synchronized block to ensure that we atomically obtain a
> one-up ID and then write the event(s) with that one-up id without any other
> threads updating the stream in between.
> When we perform many small updates, this becomes a bottleneck, because we
> have to enter synchronized blocks with other threads potentially resulting in
> contention.
> The alternate approach that I am suggesting is that instead of entering a
> 'synchronized' block to get the one-up number and then write out to the
> FileOutputStream, we can instead take that byte[] and put it on a
> BlockingQueue along with a CompletableFuture. We would then have a background
> thread whose job it is to poll that Queue using a blocking poll. As soon as
> it obtains a byte[] from that queue, it would obtain the one-up id, write out
> the one-up id, write out the byte[], and then call queue.poll() without
> blocking to see if there is anything on it. If so, repeat, until either the
> queue is empty or we have written some number of events (say 10). Once this
> background thread has written out 10 events or so, it can complete each of
> those completable futures (or if there is an Exception,
> completeExceptionally). To ensure that we don't write out some of the events
> but not all, we'd want to ensure that all of those events are written as a
> transaction (may require introducing such a concept to prov repo).
> By using this approach, we can write 10 events, each coming from a different
> thread potentially, before having to flush the contents to disk. Since these
> events may be much smaller than a single block on disk, we may even fit 10
> events in a single disk block, giving a 10x improvement in throughput.
> Also, of note, the call to `CompletableFuture.complete(null)` would be very
> expensive. Probably more expensive than writing the serialized events out to
> disk. So worth exploring other solutions there. A `BlockingQueue<Throwable>`
> that holds a failure or `null` for success, etc. Or potentially have one
> thread that is responsible for writing the data/flushing and another 1+
> threads responsible for calling `CompletableFuture.complete` etc.
> An additional improvement to the way that FlowFile Repo serializes data is to
> consider making a 'SWAP OUT' be written much more efficiently. Currently, it
> writes out an event for every FlowFile that is swapped out. This could be
> reduced to a single event that includes the ID's of all FlowFiles.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)