jon-wei commented on a change in pull request #8201: Simple memory allocation
for CliIndexer tasks
URL: https://github.com/apache/incubator-druid/pull/8201#discussion_r309417711
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
##########
@@ -196,4 +236,288 @@ public boolean shouldTaskMakeNodeAnnouncements()
{
return false;
}
+
+ @Override
+ public void shutdown()
+ {
+ if (mergeExecutor != null) {
+ mergeExecutor.shutdownNow();
+ }
+ }
+
+ private AppenderatorConfig
rewriteAppenderatorConfigMemoryLimits(AppenderatorConfig baseConfig)
+ {
+ long perWorkerLimit = workerConfig.getGlobalIngestionHeapLimitBytes() /
workerConfig.getCapacity();
+ return new MemoryParameterOverridingAppenderatorConfig(baseConfig,
perWorkerLimit);
+ }
+
+ /**
+ * This is a wrapper around AppenderatorConfig that overrides the
`maxRowsInMemory` and `maxBytesInMemory`
+ * parameters. Row-based limits are disabled by setting `maxRowsInMemory` to
an essentially unlimited value.
+ * `maxBytesInMemory` is overridden with the provided value.
+ */
+ private static class MemoryParameterOverridingAppenderatorConfig implements
AppenderatorConfig
+ {
+ private final AppenderatorConfig baseConfig;
+ private final long newMaxBytesInMemory;
+
+ public MemoryParameterOverridingAppenderatorConfig(
+ AppenderatorConfig baseConfig,
+ long newMaxBytesInMemory
+ )
+ {
+ this.baseConfig = baseConfig;
+ this.newMaxBytesInMemory = newMaxBytesInMemory;
+ }
+
+ @Override
+ public boolean isReportParseExceptions()
+ {
+ return baseConfig.isReportParseExceptions();
+ }
+
+ @Override
+ public int getMaxRowsInMemory()
+ {
+ return Integer.MAX_VALUE; // unlimited, rely on maxBytesInMemory instead
+ }
+
+ @Override
+ public long getMaxBytesInMemory()
+ {
+ return newMaxBytesInMemory;
+ }
+
+ @Override
+ public int getMaxPendingPersists()
+ {
+ return baseConfig.getMaxPendingPersists();
+ }
+
+ @Nullable
+ @Override
+ public Integer getMaxRowsPerSegment()
+ {
+ return baseConfig.getMaxRowsPerSegment();
+ }
+
+ @Nullable
+ @Override
+ public Long getMaxTotalRows()
+ {
+ return baseConfig.getMaxTotalRows();
+ }
+
+ @Override
+ public Period getIntermediatePersistPeriod()
+ {
+ return baseConfig.getIntermediatePersistPeriod();
+ }
+
+ @Override
+ public IndexSpec getIndexSpec()
+ {
+ return baseConfig.getIndexSpec();
+ }
+
+ @Override
+ public IndexSpec getIndexSpecForIntermediatePersists()
+ {
+ return baseConfig.getIndexSpecForIntermediatePersists();
+ }
+
+ @Override
+ public File getBasePersistDirectory()
+ {
+ return baseConfig.getBasePersistDirectory();
+ }
+
+ @Nullable
+ @Override
+ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
+ {
+ return baseConfig.getSegmentWriteOutMediumFactory();
+ }
+ }
+
+ private IndexMerger wrapIndexMerger(IndexMerger baseMerger)
+ {
+ return new LimitedPoolIndexMerger(baseMerger, mergeExecutor);
+ }
+
+
+ /**
+ * This wrapper around IndexMerger limits concurrent calls to the
merge/persist methods used by AppenderatorImpl
+ * with a shared executor service. Merge/persist methods that are not used
by AppenderatorImpl will throw
+ * an exception if called.
+ */
+ public static class LimitedPoolIndexMerger implements IndexMerger
Review comment:
Hm, I think it could make sense to prioritize, but not sure what the
criteria for prioritization would be right now though. For now, I think it's
fine to let them process in the order of queue entry.
It's only used for this pretty specific purpose right now, if we find a more
general use for this I think it should be moved to a separate file then (and
allow the other methods to be called).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]