loquisgon opened a new issue #11231:
URL: https://github.com/apache/druid/issues/11231


   ### Motivation
   
   Currently native batch ingestion has no guardrails with respect to its 
memory consumption. Native batch internally keeps creating data structures 
(such as `Sinks` and `Firehydrants`) roughly proportionally to the number of 
segments and intermediate persists corresponding to the input file that a 
particular peon is processing. Therefore, as the file size grows, the memory 
consumption can grow unbounded eventually making ingestion exhaust all Java 
heap available making the ingestion task fail. The main way to control this 
today is to manage the input file (i.e. dividing it into smaller pieces) or by 
changing the segment granularity (so fewer `Sinks` and `FireHydrants` are 
created) but these workarounds are not convenient all the time.  One big reason 
that this situation exists is because the code path for real time ingestion and 
that of native for the `AppenderatorImpl` class is shared. We propose to split 
the appenderator class into two classes: `RealTimeAppenderator` and `BatchApp
 enderator` and refactor `BatchAppenderator` so that data structures that are 
not required to be in memory can be garbaged collected thus removing the 
unbounded memory growth. This proposal would leave `RealTimeAppenderator` 
basically intact, same as before.
   
   ### Proposed changes
   
   We start by breaking up the current `AppenderatorImpl` into two new classes: 
`RealTimeAppenderator` and `BatchAppenderator` both implementing the 
`Appenderator` interface and potentially being derived from a base class that 
gathers common code. The `RealTimeAppenderator` is basically the same code as 
it exists today in the `AppenderatorImpl` and it will run unchanged for 
streaming (i.e. real time) tasks. `BatchAppenderator` will be modified as 
follows.
   
   The main change for `BatchAppenderator` is actually straightforward to 
understand. First unnecessary code that is there to support only the real time 
use case can be removed (such as the `VersionedIntervalTimeLine` and other 
code). More important, at a high level the main change is as follows. Today, 
there is code that persists hydrants when the code detects (through a simple 
but effective memory use model) heap pressure is at a certain level. After the 
hydrants are persisted some heap memory is released back into the system. 
However, stress tests and code inspection reveal that not all memory of the 
hydrants  is completely released. The consequence of this is that memory grows 
unbounded in a single task as its number of segments/hydrants increase (in the 
appendix we will show results of our tests that illustrate this). The change 
proposed is thus to release *all* memory being consumed by `Sinks` and 
`Firehydrants` after they are persisted. The code also makes sure that `Sinks` 
an
 d `FireHydrants` are released in the merge phase as soon as they are used 
(`Sinks` and `FireHydrants` are recovered from their persisted files just in 
time when they are merged, on a `Sink` by `Sink` basis.) We have already 
written code that demonstrate that this is feasible and the memory savings are 
dramatic for our tests demonstrating that memory growth is minimal after this 
change (see Appendix below for preliminary results).
   
   
   
   ### Rationale
   
   An alternative that we considered is not splitting the class but rather 
introduce a flag. However, this would make the code hard to read and also hard 
to maintain since the changes, though simple to understand conceptually, are 
dramatic at the code level and have to modify some areas of the concurrency 
model used in streaming. Therefore we recommend the proposal to split the 
appenderators. 
   
   The downside of splitting is that now we have a split code base for the 
`Appenderator` functionality. Initially it may make maintenance of the code 
lightly more difficult for people used to the old code but eventually the 
advantages of the  *separation of concerns* principle will kick in in  addition 
to the critical benefits of avoiding unbounded memory growth.
   
   ### Operational impact
   
   The code is not modifying any external APIs so no impact is expected in 
operation (other than the kind of out of memory issues addressed by this 
proposal will no longer happen). There will not be anything deprecated or 
removed by this change and hence no migration path that cluster operators need 
to be aware of. All of the features introduced will be on by default but given 
the magnitude of the change a feature flag will be provided to rollback the 
behavior in case of suspected bugs with no code changes.
   
   ### Test plan 
   
   In addition to usual unit test and integration test coverage we have created 
a test file that demonstrates the unbounded memory growth as well as the 
elimination of that growth with the proposed changes. 
   
   ### Apendix
   #### Test design
   We created a synthetic CSV file designed to reflect a real case scenario 
where a Druid user ingestion ran out or memory. The file has 1 million rows, 
each row with roughly 200 columns. The data represents many years of events and 
the segment granularity is set to `DAY`. There are roughly 90-100 events per 
day so at the end there are over 10,000 segments created. We used a peon with 
2G of heap to do the runs. The partition type is `dynamic` and we used defaults 
in all other areas (such as `tuningConfig` etc) of the ingestion spec.
   
   #### Test with current code (Apache/druid master)
   The following diagram (obtained using gcplot processing the garbage 
collection log) shows the memory utilization of the ingestion of the test file. 
You can see that the memory grows really quickly (both before and after gc) 
until it just flattens and hits the 2G heap ceiling. Actually, this run ended 
in an out of memory error (OOM) thus ingestion failed. Ingestion crashed in the 
"segment creation" phase of the `Appenderator` not even making it to the 
"merge" phase. Other aspects of the garbage collection log (not shown here) 
shows that about 50% of the time (all the flat curve at the right of the graph) 
are actually spend just doing garbage collection and almost no actual work 
(until ingestion finally crashes).
   
   
![image](https://user-images.githubusercontent.com/19496586/117744836-3bcb1680-b1be-11eb-9a49-94b4ecfc00ee.png)
   
   #### Test with code incorporating the proposed changes
   This test is the same as the one above but with the new code (code is not 
production ready yet, just enough to demonstrate the value). Now you see a very 
different picture. The "EMA" line is the "exponential moving average" of the 
heap utilization and it clearly shows that memory usage is now very stable over 
both phases of ingestion and also not getting even close to the 2G heap limit 
(the left side of the graph with the "spikes" is the segment creation phase 
where spikes are created when a segment gets persisted and ll its memory is 
released back. The flat part in the right hand side is the merge phase which 
clearly shows the effect of merging on a `Sink` by `Sink` basis and then 
releasing all the memory associated with the `Sink` and `FireHydrants` after 
they have been merged.
   
   
![image](https://user-images.githubusercontent.com/19496586/117744924-6e750f00-b1be-11eb-8f1c-30654c8cb4a1.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to