This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f5bfccc  Fix maxBytesInMemory for heap overhead of all sinks and 
hydrants check (#10891)
f5bfccc is described below

commit f5bfccc7208f29e352808d4de93ab64e8134cd50
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Thu Feb 18 21:48:57 2021 -0800

    Fix maxBytesInMemory for heap overhead of all sinks and hydrants check 
(#10891)
    
    * fix maxBytesInMemory
    
    * fix maxBytesInMemory check
    
    * fix maxBytesInMemory check
    
    * fix test
---
 .../realtime/appenderator/AppenderatorImpl.java    | 76 ++++++++++++----------
 .../realtime/appenderator/AppenderatorTest.java    |  8 +--
 2 files changed, 45 insertions(+), 39 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 5d7d8e0..92590ea 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -332,6 +332,47 @@ public class AppenderatorImpl implements Appenderator
       if (allowIncrementalPersists) {
         // persistAll clears rowsCurrentlyInMemory, no need to update it.
         log.info("Flushing in-memory data to disk because %s.", 
String.join(",", persistReasons));
+
+        long bytesPersisted = 0L;
+        for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) 
{
+          final Sink sinkEntry = entry.getValue();
+          if (sinkEntry != null) {
+            bytesPersisted += sinkEntry.getBytesInMemory();
+            if (sinkEntry.swappable()) {
+              // After swapping the sink, we use memory mapped segment 
instead. However, the memory mapped segment still consumes memory.
+              // These memory mapped segments are held in memory throughout 
the ingestion phase and permanently add to the bytesCurrentlyInMemory
+              int memoryStillInUse = 
calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
+              bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
+            }
+          }
+        }
+
+        if (!skipBytesInMemoryOverheadCheck && bytesCurrentlyInMemory.get() - 
bytesPersisted > maxBytesTuningConfig) {
+          // We are still over maxBytesTuningConfig even after persisting.
+          // This means that we ran out of all available memory to ingest (due 
to overheads created as part of ingestion)
+          final String alertMessage = StringUtils.format(
+              "Task has exceeded safe estimated heap usage limits, failing "
+              + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: 
[%d])",
+              sinks.size(),
+              sinks.values().stream().mapToInt(Iterables::size).sum(),
+              getTotalRowCount()
+          );
+          final String errorMessage = StringUtils.format(
+              "%s.\nThis can occur when the overhead from too many 
intermediary segment persists becomes to "
+              + "great to have enough space to process additional input rows. 
This check, along with metering the overhead "
+              + "of these objects to factor into the 'maxBytesInMemory' 
computation, can be disabled by setting "
+              + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing 
so might allow the task to naturally encounter "
+              + "a 'java.lang.OutOfMemoryError'). Alternatively, 
'maxBytesInMemory' can be increased which will cause an "
+              + "increase in heap footprint, but will allow for more 
intermediary segment persists to occur before "
+              + "reaching this condition.",
+              alertMessage
+          );
+          log.makeAlert(alertMessage)
+             .addData("dataSource", schema.getDataSource())
+             .emit();
+          throw new RuntimeException(errorMessage);
+        }
+
         Futures.addCallback(
             persistAll(committerSupplier == null ? null : 
committerSupplier.get()),
             new FutureCallback<Object>()
@@ -513,7 +554,6 @@ public class AppenderatorImpl implements Appenderator
   public ListenableFuture<Object> persistAll(@Nullable final Committer 
committer)
   {
     throwPersistErrorIfExists();
-    long bytesInMemoryBeforePersist = bytesCurrentlyInMemory.get();
     final Map<String, Integer> currentHydrants = new HashMap<>();
     final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = 
new ArrayList<>();
     int numPersistedRows = 0;
@@ -539,16 +579,9 @@ public class AppenderatorImpl implements Appenderator
       }
 
       if (sink.swappable()) {
-        // After swapping the sink, we use memory mapped segment instead. 
However, the memory mapped segment still consumes memory.
-        // These memory mapped segments are held in memory throughout the 
ingestion phase and permanently add to the bytesCurrentlyInMemory
-        int memoryStillInUse = 
calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
-        bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
-
         indexesToPersist.add(Pair.of(sink.swap(), identifier));
-
       }
     }
-
     log.debug("Submitting persist runnable for dataSource[%s]", 
schema.getDataSource());
 
     final Object commitMetadata = committer == null ? null : 
committer.getMetadata();
@@ -638,33 +671,6 @@ public class AppenderatorImpl implements Appenderator
 
     log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, 
bytesPersisted);
 
-    // bytesCurrentlyInMemory can change while persisting due to concurrent 
ingestion.
-    // Hence, we use bytesInMemoryBeforePersist to determine the change of 
this persist
-    if (!skipBytesInMemoryOverheadCheck && bytesInMemoryBeforePersist - 
bytesPersisted > maxBytesTuningConfig) {
-      // We are still over maxBytesTuningConfig even after persisting.
-      // This means that we ran out of all available memory to ingest (due to 
overheads created as part of ingestion)
-      final String alertMessage = StringUtils.format(
-          "Task has exceeded safe estimated heap usage limits, failing "
-          + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
-          sinks.size(),
-          sinks.values().stream().mapToInt(Iterables::size).sum(),
-          getTotalRowCount()
-      );
-      final String errorMessage = StringUtils.format(
-          "%s.\nThis can occur when the overhead from too many intermediary 
segment persists becomes to "
-          + "great to have enough space to process additional input rows. This 
check, along with metering the overhead "
-          + "of these objects to factor into the 'maxBytesInMemory' 
computation, can be disabled by setting "
-          + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so 
might allow the task to naturally encounter "
-          + "a 'java.lang.OutOfMemoryError'). Alternatively, 
'maxBytesInMemory' can be increased which will cause an "
-          + "increase in heap footprint, but will allow for more intermediary 
segment persists to occur before "
-          + "reaching this condition.",
-          alertMessage
-      );
-      log.makeAlert(alertMessage)
-         .addData("dataSource", schema.getDataSource())
-         .emit();
-      throw new RuntimeException(errorMessage);
-    }
     return future;
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
index a6c74e6..0c1285d 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java
@@ -257,7 +257,7 @@ public class AppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testMaxBytesInMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, 10000, 
true)) {
+    try (final AppenderatorTester tester = new AppenderatorTester(100, 15000, 
true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -297,7 +297,7 @@ public class AppenderatorTest extends 
InitializedNullHandlingTest
       );
 
       // We do multiple more adds to the same sink to cause persist.
-      for (int i = 0; i < 26; i++) {
+      for (int i = 0; i < 53; i++) {
         appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), 
committerSupplier);
       }
       sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
@@ -333,7 +333,7 @@ public class AppenderatorTest extends 
InitializedNullHandlingTest
       );
 
       // We do multiple more adds to the same sink to cause persist.
-      for (int i = 0; i < 5; i++) {
+      for (int i = 0; i < 31; i++) {
         appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), 
committerSupplier);
       }
       // currHydrant size is 0 since we just persist all indexes to disk.
@@ -363,7 +363,7 @@ public class AppenderatorTest extends 
InitializedNullHandlingTest
   @Test(expected = RuntimeException.class)
   public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
   {
-    try (final AppenderatorTester tester = new AppenderatorTester(100, 10, 
true)) {
+    try (final AppenderatorTester tester = new AppenderatorTester(100, 5180, 
true)) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to