cryptoe commented on code in PR #13271:
URL: https://github.com/apache/druid/pull/13271#discussion_r1008062118


##########
extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/OnHeapPollingCache.java:
##########
@@ -83,6 +84,19 @@ public List<K> getKeys(final V value)
     return listOfKeys;
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public long estimateHeapFootprint()
+  {
+    for (final Map.Entry<K, V> entry : immutableMap.entrySet()) {
+      if (!(entry.getKey() instanceof String) || !(entry.getValue() instanceof 
String)) {

Review Comment:
   We are iterating the map twice which I think can be avoided no?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java:
##########
@@ -250,8 +242,8 @@ public FrameContext frameContext(QueryDefinition queryDef, 
int stageNumber)
         indexIO,
         dataSegmentProvider,
         WorkerMemoryParameters.compute(
-            Runtime.getRuntime().maxMemory(),
-            numWorkersInJvm,
+            computeAvailableHeapMemory(),

Review Comment:
   For each stage, we would call this method once. Meaning we would iterate all 
the lookup key values for every stage.
     
   Should we store the values in a variable while initializing 
IndexerWorkerContext, or maybe create a init method since the call might be 
slow? 
   
   Maybe we can do the sizing calculation on the controller and then pass the 
size values to the workers. then each worker might not iterate the full look 
map. The downside is we might work with stale values which is the case even now 
but the window is longer in the controller case.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java:
##########
@@ -276,6 +268,62 @@ public Bouncer processorBouncer()
     return injector.getInstance(Bouncer.class);
   }
 
+  /**
+   * Number of workers that may run in the current JVM, including the current 
worker.
+   */
+  private int computeNumWorkersInJvm()
+  {
+    if (toolbox.getAppenderatorsManager() instanceof 
UnifiedIndexerAppenderatorsManager) {
+      // CliIndexer
+      return injector.getInstance(WorkerConfig.class).getCapacity();
+    } else {
+      // CliPeon
+      return 1;
+    }
+  }
+
+  /**
+   * Amount of memory available for our usage.
+   */
+  private long computeAvailableHeapMemory()
+  {
+    return Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint();
+  }
+
+  /**
+   * Total estimated lookup footprint. Obtained by calling {@link 
LookupExtractor#estimateHeapFootprint()} on
+   * all available lookups.
+   */
+  private long computeTotalLookupFootprint()
+  {
+    // Subtract memory taken up by lookups. Correctness of this operation 
depends on lookups being loaded *before*
+    // we create this instance. Luckily, this is the typical mode of 
operation, since by default
+    // druid.lookup.enableLookupSyncOnStartup = true.

Review Comment:
   Should we add a comment about this in the memory tuning section of MSQ docs 
? 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java:
##########
@@ -276,6 +268,62 @@ public Bouncer processorBouncer()
     return injector.getInstance(Bouncer.class);
   }
 
+  /**
+   * Number of workers that may run in the current JVM, including the current 
worker.
+   */
+  private int computeNumWorkersInJvm()
+  {
+    if (toolbox.getAppenderatorsManager() instanceof 
UnifiedIndexerAppenderatorsManager) {
+      // CliIndexer
+      return injector.getInstance(WorkerConfig.class).getCapacity();
+    } else {
+      // CliPeon
+      return 1;
+    }
+  }
+
+  /**
+   * Amount of memory available for our usage.
+   */
+  private long computeAvailableHeapMemory()
+  {
+    return Runtime.getRuntime().maxMemory() - computeTotalLookupFootprint();
+  }
+
+  /**
+   * Total estimated lookup footprint. Obtained by calling {@link 
LookupExtractor#estimateHeapFootprint()} on
+   * all available lookups.
+   */
+  private long computeTotalLookupFootprint()
+  {
+    // Subtract memory taken up by lookups. Correctness of this operation 
depends on lookups being loaded *before*
+    // we create this instance. Luckily, this is the typical mode of 
operation, since by default
+    // druid.lookup.enableLookupSyncOnStartup = true.

Review Comment:
   Also, should we just fetch this information from the coordinator in another 
PR? 
    Instead of calculating the size of each worker, the coordinator should be 
able to give us this size as part of lookup metadata ? wdyt ?



##########
extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/cache/polling/OnHeapPollingCache.java:
##########
@@ -83,6 +84,19 @@ public List<K> getKeys(final V value)
     return listOfKeys;
   }
 
+  @Override
+  @SuppressWarnings("unchecked")
+  public long estimateHeapFootprint()
+  {
+    for (final Map.Entry<K, V> entry : immutableMap.entrySet()) {
+      if (!(entry.getKey() instanceof String) || !(entry.getValue() instanceof 
String)) {
+        return 0;
+      }
+    }
+
+    return MapLookupExtractor.estimateHeapFootprint((Map<String, String>) 
immutableMap);

Review Comment:
   IMHO, Instead of the monitor which runs every 10 mins, the place where we 
are updating the cache should do the running size calculation. Then this method 
just becomes a getter on top of that variable. 
   
   We can do that as part of another PR as well.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to