This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f5bfccc Fix maxBytesInMemory for heap overhead of all sinks and
hydrants check (#10891)
f5bfccc is described below
commit f5bfccc7208f29e352808d4de93ab64e8134cd50
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Thu Feb 18 21:48:57 2021 -0800
Fix maxBytesInMemory for heap overhead of all sinks and hydrants check
(#10891)
* fix maxBytesInMemory
* fix maxBytesInMemory check
* fix maxBytesInMemory check
* fix test
---
.../realtime/appenderator/AppenderatorImpl.java | 76 ++++++++++++----------
.../realtime/appenderator/AppenderatorTest.java | 8 +--
2 files changed, 45 insertions(+), 39 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 5d7d8e0..92590ea 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -332,6 +332,47 @@ public class AppenderatorImpl implements Appenderator
if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
log.info("Flushing in-memory data to disk because %s.",
String.join(",", persistReasons));
+
+ long bytesPersisted = 0L;
+ for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet())
{
+ final Sink sinkEntry = entry.getValue();
+ if (sinkEntry != null) {
+ bytesPersisted += sinkEntry.getBytesInMemory();
+ if (sinkEntry.swappable()) {
+ // After swapping the sink, we use memory mapped segment
instead. However, the memory mapped segment still consumes memory.
+ // These memory mapped segments are held in memory throughout
the ingestion phase and permanently add to the bytesCurrentlyInMemory
+ int memoryStillInUse =
calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
+ bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
+ }
+ }
+ }
+
+ if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() -
bytesPersisted > maxBytesTuningConfig) {
+ // We are still over maxBytesTuningConfig even after persisting.
+ // This means that we ran out of all available memory to ingest (due
to overheads created as part of ingestion)
+ final String alertMessage = StringUtils.format(
+ "Task has exceeded safe estimated heap usage limits, failing "
+ + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows:
[%d])",
+ sinks.size(),
+ sinks.values().stream().mapToInt(Iterables::size).sum(),
+ getTotalRowCount()
+ );
+ final String errorMessage = StringUtils.format(
+ "%s.\nThis can occur when the overhead from too many
intermediary segment persists becomes to "
+ + "great to have enough space to process additional input rows.
This check, along with metering the overhead "
+ + "of these objects to factor into the 'maxBytesInMemory'
computation, can be disabled by setting "
+ + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing
so might allow the task to naturally encounter "
+ + "a 'java.lang.OutOfMemoryError'). Alternatively,
'maxBytesInMemory' can be increased which will cause an "
+ + "increase in heap footprint, but will allow for more
intermediary segment persists to occur before "
+ + "reaching this condition.",
+ alertMessage
+ );
+ log.makeAlert(alertMessage)
+ .addData("dataSource", schema.getDataSource())
+ .emit();
+ throw new RuntimeException(errorMessage);
+ }
+
Futures.addCallback(
persistAll(committerSupplier == null ? null :
committerSupplier.get()),
new FutureCallback<Object>()
@@ -513,7 +554,6 @@ public class AppenderatorImpl implements Appenderator
public ListenableFuture<Object> persistAll(@Nullable final Committer
committer)
{
throwPersistErrorIfExists();
- long bytesInMemoryBeforePersist = bytesCurrentlyInMemory.get();
final Map<String, Integer> currentHydrants = new HashMap<>();
final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist =
new ArrayList<>();
int numPersistedRows = 0;
@@ -539,16 +579,9 @@ public class AppenderatorImpl implements Appenderator
}
if (sink.swappable()) {
- // After swapping the sink, we use memory mapped segment instead.
However, the memory mapped segment still consumes memory.
- // These memory mapped segments are held in memory throughout the
ingestion phase and permanently add to the bytesCurrentlyInMemory
- int memoryStillInUse =
calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
- bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
-
indexesToPersist.add(Pair.of(sink.swap(), identifier));
-
}
}
-
log.debug("Submitting persist runnable for dataSource[%s]",
schema.getDataSource());
final Object commitMetadata = committer == null ? null :
committer.getMetadata();
@@ -638,33 +671,6 @@ public class AppenderatorImpl implements Appenderator
log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows,
bytesPersisted);
- // bytesCurrentlyInMemory can change while persisting due to concurrent
ingestion.
- // Hence, we use bytesInMemoryBeforePersist to determine the change of
this persist
- if (!skipBytesInMemoryOverheadCheck && bytesInMemoryBeforePersist -
bytesPersisted > maxBytesTuningConfig) {
- // We are still over maxBytesTuningConfig even after persisting.
- // This means that we ran out of all available memory to ingest (due to
overheads created as part of ingestion)
- final String alertMessage = StringUtils.format(
- "Task has exceeded safe estimated heap usage limits, failing "
- + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
- sinks.size(),
- sinks.values().stream().mapToInt(Iterables::size).sum(),
- getTotalRowCount()
- );
- final String errorMessage = StringUtils.format(
- "%s.\nThis can occur when the overhead from too many intermediary
segment persists becomes to "
- + "great to have enough space to process additional input rows. This
check, along with metering the overhead "
- + "of these objects to factor into the 'maxBytesInMemory'
computation, can be disabled by setting "
- + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so
might allow the task to naturally encounter "
- + "a 'java.lang.OutOfMemoryError'). Alternatively,
'maxBytesInMemory' can be increased which will cause an "
- + "increase in heap footprint, but will allow for more intermediary
segment persists to occur before "
- + "reaching this condition.",
- alertMessage
- );
- log.makeAlert(alertMessage)
- .addData("dataSource", schema.getDataSource())
- .emit();
- throw new RuntimeException(errorMessage);
- }
return future;
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index a6c74e6..0c1285d 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -257,7 +257,7 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testMaxBytesInMemory() throws Exception
{
- try (final AppenderatorTester tester = new AppenderatorTester(100, 10000,
true)) {
+ try (final AppenderatorTester tester = new AppenderatorTester(100, 15000,
true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -297,7 +297,7 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 26; i++) {
+ for (int i = 0; i < 53; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
}
sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
@@ -333,7 +333,7 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
);
// We do multiple more adds to the same sink to cause persist.
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 31; i++) {
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1),
committerSupplier);
}
// currHydrant size is 0 since we just persist all indexes to disk.
@@ -363,7 +363,7 @@ public class AppenderatorTest extends
InitializedNullHandlingTest
@Test(expected = RuntimeException.class)
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{
- try (final AppenderatorTester tester = new AppenderatorTester(100, 10,
true)) {
+ try (final AppenderatorTester tester = new AppenderatorTester(100, 5180,
true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]