loquisgon opened a new issue #11231: URL: https://github.com/apache/druid/issues/11231
### Motivation Currently native batch ingestion has no guardrails with respect to its memory consumption. Native batch internally keeps creating data structures (such as `Sinks` and `Firehydrants`) roughly proportionally to the number of segments and intermediate persists corresponding to the input file that a particular peon is processing. Therefore, as the file size grows, the memory consumption can grow unbounded eventually making ingestion exhaust all Java heap available making the ingestion task fail. The main way to control this today is to manage the input file (i.e. dividing it into smaller pieces) or by changing the segment granularity (so fewer `Sinks` and `FireHydrants` are created) but these workarounds are not convenient all the time. One big reason that this situation exists is because the code path for real time ingestion and that of native for the `AppenderatorImpl` class is shared. We propose to split the appenderator class into two classes: `RealTimeAppenderator` and `BatchApp enderator` and refactor `BatchAppenderator` so that data structures that are not required to be in memory can be garbaged collected thus removing the unbounded memory growth. This proposal would leave `RealTimeAppenderator` basically intact, same as before. ### Proposed changes We start by breaking up the current `AppenderatorImpl` into two new classes: `RealTimeAppenderator` and `BatchAppenderator` both implementing the `Appenderator` interface and potentially being derived from a base class that gathers common code. The `RealTimeAppenderator` is basically the same code as it exists today in the `AppenderatorImpl` and it will run unchanged for streaming (i.e. real time) tasks. `BatchAppenderator` will be modified as follows. The main change for `BatchAppenderator` is actually straightforward to understand. First unnecessary code that is there to support only the real time use case can be removed (such as the `VersionedIntervalTimeLine` and other code). More important, at a high level the main change is as follows. Today, there is code that persists hydrants when the code detects (through a simple but effective memory use model) heap pressure is at a certain level. After the hydrants are persisted some heap memory is released back into the system. However, stress tests and code inspection reveal that not all memory of the hydrants is completely released. The consequence of this is that memory grows unbounded in a single task as its number of segments/hydrants increase (in the appendix we will show results of our tests that illustrate this). The change proposed is thus to release *all* memory being consumed by `Sinks` and `Firehydrants` after they are persisted. The code also makes sure that `Sinks` an d `FireHydrants` are released in the merge phase as soon as they are used (`Sinks` and `FireHydrants` are recovered from their persisted files just in time when they are merged, on a `Sink` by `Sink` basis.) We have already written code that demonstrate that this is feasible and the memory savings are dramatic for our tests demonstrating that memory growth is minimal after this change (see Appendix below for preliminary results). ### Rationale An alternative that we considered is not splitting the class but rather introduce a flag. However, this would make the code hard to read and also hard to maintain since the changes, though simple to understand conceptually, are dramatic at the code level and have to modify some areas of the concurrency model used in streaming. Therefore we recommend the proposal to split the appenderators. The downside of splitting is that now we have a split code base for the `Appenderator` functionality. Initially it may make maintenance of the code lightly more difficult for people used to the old code but eventually the advantages of the *separation of concerns* principle will kick in in addition to the critical benefits of avoiding unbounded memory growth. ### Operational impact The code is not modifying any external APIs so no impact is expected in operation (other than the kind of out of memory issues addressed by this proposal will no longer happen). There will not be anything deprecated or removed by this change and hence no migration path that cluster operators need to be aware of. All of the features introduced will be on by default but given the magnitude of the change a feature flag will be provided to rollback the behavior in case of suspected bugs with no code changes. ### Test plan In addition to usual unit test and integration test coverage we have created a test file that demonstrates the unbounded memory growth as well as the elimination of that growth with the proposed changes. ### Apendix #### Test design We created a synthetic CSV file designed to reflect a real case scenario where a Druid user ingestion ran out or memory. The file has 1 million rows, each row with roughly 200 columns. The data represents many years of events and the segment granularity is set to `DAY`. There are roughly 90-100 events per day so at the end there are over 10,000 segments created. We used a peon with 2G of heap to do the runs. The partition type is `dynamic` and we used defaults in all other areas (such as `tuningConfig` etc) of the ingestion spec. #### Test with current code (Apache/druid master) The following diagram (obtained using gcplot processing the garbage collection log) shows the memory utilization of the ingestion of the test file. You can see that the memory grows really quickly (both before and after gc) until it just flattens and hits the 2G heap ceiling. Actually, this run ended in an out of memory error (OOM) thus ingestion failed. Ingestion crashed in the "segment creation" phase of the `Appenderator` not even making it to the "merge" phase. Other aspects of the garbage collection log (not shown here) shows that about 50% of the time (all the flat curve at the right of the graph) are actually spend just doing garbage collection and almost no actual work (until ingestion finally crashes).  #### Test with code incorporating the proposed changes This test is the same as the one above but with the new code (code is not production ready yet, just enough to demonstrate the value). Now you see a very different picture. The "EMA" line is the "exponential moving average" of the heap utilization and it clearly shows that memory usage is now very stable over both phases of ingestion and also not getting even close to the 2G heap limit (the left side of the graph with the "spikes" is the segment creation phase where spikes are created when a segment gets persisted and ll its memory is released back. The flat part in the right hand side is the merge phase which clearly shows the effect of merging on a `Sink` by `Sink` basis and then releasing all the memory associated with the `Sink` and `FireHydrants` after they have been merged.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
