This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c7a03a47b Lower default maxRowsInMemory for realtime ingestion. 
(#13939)
1c7a03a47b is described below

commit 1c7a03a47be652853a9991f79716f69e0ad39bd4
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 21 10:36:36 2023 -0700

    Lower default maxRowsInMemory for realtime ingestion. (#13939)
    
    * Lower default maxRowsInMemory for realtime ingestion.
    
    The thinking here is that for best ingestion throughput, we want
    intermediate persists to be as big as possible without using up all
    available memory. So, we rely mainly on maxBytesInMemory. The default
    maxRowsInMemory (1 million) is really just a safety: in case we have
    a large number of very small rows, we don't want to get overwhelmed
    by per-row overheads.
    
    However, maximum ingestion throughput isn't necessarily the primary
    goal for realtime ingestion. Query performance is also important. And
    because query performance is not as good on the in-memory dataset, it's
    helpful to keep it from growing too large. 150k seems like a reasonable
    balance here. It means that for a typical 5 million row segment, we
    won't trigger more than 33 persists due to this limit, which is a
    reasonable number of persists.
    
    * Update tests.
    
    * Update 
server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * Fix test.
    
    * Fix link.
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../indexing/kafka/KafkaIndexTaskTuningConfigTest.java    |  2 +-
 .../kafka/supervisor/KafkaSupervisorTuningConfigTest.java |  2 +-
 .../kinesis/KinesisIndexTaskTuningConfigTest.java         |  2 +-
 .../supervisor/KinesisSupervisorTuningConfigTest.java     |  2 +-
 .../org/apache/druid/msq/indexing/MSQTuningConfig.java    |  2 +-
 .../java/org/apache/druid/indexer/HadoopTuningConfig.java |  4 ++--
 .../common/index/RealtimeAppenderatorTuningConfig.java    |  2 +-
 .../org/apache/druid/indexing/common/task/IndexTask.java  |  2 +-
 .../druid/segment/indexing/RealtimeTuningConfig.java      |  4 ++--
 .../org/apache/druid/segment/indexing/TuningConfig.java   | 15 ++++++++++++++-
 .../druid/segment/indexing/RealtimeTuningConfigTest.java  |  2 +-
 11 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 6e3de31b75..5033929830 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -63,7 +63,7 @@ public class KafkaIndexTaskTuningConfigTest
 
     Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
-    Assert.assertEquals(1000000, config.getMaxRowsInMemory());
+    Assert.assertEquals(150000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertNull(config.getMaxTotalRows());
     Assert.assertEquals(new Period("PT10M"), 
config.getIntermediatePersistPeriod());
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 5ac97b12c8..d0ad3cb088 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -59,7 +59,7 @@ public class KafkaSupervisorTuningConfigTest
 
     Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
-    Assert.assertEquals(1000000, config.getMaxRowsInMemory());
+    Assert.assertEquals(150000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), 
config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index f283a7455f..2892ea51c3 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -69,7 +69,7 @@ public class KinesisIndexTaskTuningConfigTest
 
     Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
-    Assert.assertEquals(1000000, config.getMaxRowsInMemory());
+    Assert.assertEquals(150000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), 
config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index a5c48b35b6..8f7440f2d7 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -58,7 +58,7 @@ public class KinesisSupervisorTuningConfigTest
 
     Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
-    Assert.assertEquals(1000000, config.getMaxRowsInMemory());
+    Assert.assertEquals(150000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), 
config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
index 6d499f4f0e..f97e250e1c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
@@ -33,7 +33,7 @@ import java.util.Objects;
 public class MSQTuningConfig
 {
   /**
-   * Lower than {@link 
org.apache.druid.segment.indexing.TuningConfig#DEFAULT_MAX_ROWS_IN_MEMORY} to 
minimize the
+   * Lower than {@link 
org.apache.druid.segment.indexing.TuningConfig#DEFAULT_MAX_ROWS_IN_MEMORY_BATCH}
 to minimize the
    * impact of per-row overheads that are not accounted for by 
OnheapIncrementalIndex. For example: overheads
    * related to creating bitmaps during persist.
    *
diff --git 
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
 
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
index eefdbb1d38..88b8fbdc16 100644
--- 
a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
+++ 
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
@@ -57,7 +57,7 @@ public class HadoopTuningConfig implements TuningConfig
         DEFAULT_INDEX_SPEC,
         DEFAULT_INDEX_SPEC,
         DEFAULT_APPENDABLE_INDEX,
-        DEFAULT_MAX_ROWS_IN_MEMORY,
+        DEFAULT_MAX_ROWS_IN_MEMORY_BATCH,
         0L,
         false,
         true,
@@ -141,7 +141,7 @@ public class HadoopTuningConfig implements TuningConfig
     this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists 
== null ?
                                             this.indexSpec : 
indexSpecForIntermediatePersists;
     this.maxRowsInMemory = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == 
null
-                                                      ? 
DEFAULT_MAX_ROWS_IN_MEMORY
+                                                      ? 
DEFAULT_MAX_ROWS_IN_MEMORY_BATCH
                                                       : maxRowsInMemoryCOMPAT 
: maxRowsInMemory;
     this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     // initializing this to 0, it will be lazily initialized to a value
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 8b11eddb7c..d91daa6d92 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -91,7 +91,7 @@ public class RealtimeAppenderatorTuningConfig implements 
AppenderatorConfig
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
-    this.maxRowsInMemory = maxRowsInMemory == null ? 
DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
+    this.maxRowsInMemory = maxRowsInMemory == null ? 
DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
     // @see #getMaxBytesInMemoryOrDefault()
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 1934f572d0..31acf6e6b6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1381,7 +1381,7 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     )
     {
       this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
-      this.maxRowsInMemory = maxRowsInMemory == null ? 
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
+      this.maxRowsInMemory = maxRowsInMemory == null ? 
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY_BATCH : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see #getMaxBytesInMemoryOrDefault()
       this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 84061711b3..55ae04bd7a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -60,7 +60,7 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
   {
     return new RealtimeTuningConfig(
         DEFAULT_APPENDABLE_INDEX,
-        DEFAULT_MAX_ROWS_IN_MEMORY,
+        DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME,
         0L,
         DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
         DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
@@ -129,7 +129,7 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
-    this.maxRowsInMemory = maxRowsInMemory == null ? 
DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
+    this.maxRowsInMemory = maxRowsInMemory == null ? 
DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
     // @see #getMaxBytesInMemoryOrDefault()
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java 
b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
index 266204520c..ba638c2c48 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
@@ -38,7 +38,20 @@ public interface TuningConfig
   AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new 
OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
-  int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
+
+  /**
+   * For batch ingestion, we want to maximize throughput by minimizing the 
number of incremental persists. The limit
+   * here is really a safety: in case we have a large number of very small 
rows, we don't want to get overwhelmed
+   * by per-row overheads. Mostly, we rely on the bytes limit {@link 
#getMaxBytesInMemory()}.
+   */
+  int DEFAULT_MAX_ROWS_IN_MEMORY_BATCH = 1_000_000;
+
+  /**
+   * For realtime ingestion, we want to balance ingestion throughput and query 
performance. Since queries on
+   * in-memory data are slower due to using {@link 
org.apache.druid.segment.incremental.IncrementalIndex}
+   * instead of {@link org.apache.druid.segment.QueryableIndex}, we cap the 
row count of in-memory data.
+   */
+  int DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME = 150_000;
   boolean DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK = false;
   long DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS = 0L;
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
 
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index c240cc4d35..23432c9758 100644
--- 
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -89,7 +89,7 @@ public class RealtimeTuningConfigTest
     Assert.assertEquals(new Period("PT10M"), 
config.getIntermediatePersistPeriod());
     Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec());
     Assert.assertEquals(0, config.getMaxPendingPersists());
-    Assert.assertEquals(1000000, config.getMaxRowsInMemory());
+    Assert.assertEquals(150000, config.getMaxRowsInMemory());
     Assert.assertEquals(0, config.getMergeThreadPriority());
     Assert.assertEquals(0, config.getPersistThreadPriority());
     Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to