Copilot commented on code in PR #18785:
URL: https://github.com/apache/pinot/pull/18785#discussion_r3425345261
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -483,7 +487,37 @@ protected boolean consumeLoop()
.create(_currentOffset); // so that we always update the metric when
we enter this method.
_segmentLogger.info("Starting consumption loop start offset {},
finalOffset {}", _currentOffset, _finalOffset);
+ boolean wasPausedForMemory = false;
while (!_shouldStop && !endCriteriaReached()) {
+ // Server memory guard: while the server is under heap pressure, park
this consumer instead of fetching more
+ // messages, so memory stops growing; resume automatically once the
+ // pressure clears. Only applies in INITIAL_CONSUMING - the catch-up
states (CATCHING_UP / CONSUMING_TO_ONLINE)
+ // must keep making progress to complete the Helix transition. The sleep
sits above the fetchMessages try-block
+ // so that an interrupt from stop() terminates the thread promptly
instead of being misclassified as a transient
+ // stream error. endCriteriaReached() is still evaluated every
iteration, so force-commit, time-limit and
+ // end-of-partition all continue to take precedence over the pause.
+ if (_state == State.INITIAL_CONSUMING &&
_ingestionMemoryGuard.shouldPauseConsumption(
+ _partitionUpsertMetadataManager, _partitionDedupMetadataManager)) {
+ if (!wasPausedForMemory) {
+ _segmentLogger.warn("Pausing consumption due to server memory
pressure");
+ wasPausedForMemory = true;
+ }
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_CONSUMPTION_PAUSED_MEMORY, 1L);
Review Comment:
When the memory guard pauses consumption on the first iteration (heap
pressure already true), this loop `continue`s before setting
`LLC_PARTITION_CONSUMING` to 1. That can leave the gauge at 0 for the whole
pause window, making monitoring think the partition is not consuming even
though the consumer thread is running/parked.
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -51,6 +51,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_CLP_UNENCODABLE("rows", false),
REALTIME_CLP_ENCODED_NON_STRINGS("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
+ // Incremented once per pause check-interval that a consuming segment is
parked by the server-local memory guard
+ // (RealtimeIngestionMemoryGuard) due to heap pressure or a primary-key cap.
Reported per table so operators can see
+ // which tables are being throttled.
Review Comment:
The comment mentions pausing due to a "primary-key cap", but the current
guard implementation only pauses based on heap-pressure hysteresis (no
primary-key cap config/trigger exists in this PR). This makes the metric
documentation misleading.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1546,6 +1546,29 @@ public static class Server {
// Default to 0.0 (no limit)
public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
+ // Configs for the server-local realtime ingestion memory guard
(RealtimeIngestionMemoryGuard). When the JVM heap
+ // crosses the pause threshold, realtime consumers are parked (stop
fetching from the stream) until heap recovers
+ // below the resume threshold, protecting the server from OOM (primarily
driven by on-heap upsert/dedup primary-key
+ // metadata). It is server-local and self-healing.
+ // Mode controls which tables are guarded: ALL | UPSERT_DEDUP_ONLY |
DISABLED. On by default, scoped to the
+ // upsert/dedup tables whose on-heap metadata is the main OOM driver.
+ public static final String CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE =
+ "pinot.server.consumption.memory.pause.mode";
+ public static final String DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE =
"UPSERT_DEDUP_ONLY";
Review Comment:
The PR description/config table calls out an optional
`pinot.server.consumption.memory.pause.primary.key.cap` trigger, but there is
no corresponding constant here and no primary-key-cap logic anywhere in the
code (also no `getNumberOfPrimaryKeys()` API change referenced in the
description). Either implement the cap trigger end-to-end (config constant +
guard logic + tests) or update the PR description/metrics comments to match the
shipped behavior.
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -43,6 +43,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
// Gauge to reflect whether pauseless is enabled or not
PAUSELESS_CONSUMPTION_ENABLED("pauselessConsumptionEnabled", false),
+ // Server-global gauge (0/1) reflecting whether realtime ingestion is
currently paused on this server due to JVM heap
+ // pressure (see RealtimeIngestionMemoryGuard). Lets dashboards/alerts
distinguish a deliberate memory pause from a
+ // wedged consumer.
Review Comment:
The gauge description says realtime ingestion is "currently paused". Since
the guard updates this gauge when heap-pressure flips (not when any specific
table is actually parked), it is more accurate to describe it as the
memory-guard heap-pressure trigger being active (which will pause *eligible*
consumers based on mode).
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeIngestionMemoryGuard.java:
##########
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Locale;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Server-local guard that pauses realtime ingestion when the JVM is under
memory pressure, to protect the server from
+/// OOM. The dominant driver this targets is the on-heap upsert/dedup
primary-key metadata, which grows with ingestion
+/// and is not freed by committing a consuming segment.
+///
+/// A single daemon thread samples the JVM heap every `checkIntervalMs` and
maintains a `_heapPressure` flag with
+/// hysteresis: it flips to `true` once `usedHeap / maxHeap >= pauseRatio` and
back to `false` once
+/// `usedHeap / maxHeap <= resumeRatio`. Realtime consumers consult
[#shouldPauseConsumption] inside their consume loop
+/// and park (stop fetching from the stream) while it returns `true`, so
memory stops growing; they resume
+/// automatically when heap recovers. This is server-local and self-healing —
it does not involve the controller or
+/// ZooKeeper.
+///
+/// SCOPE / RESIDUAL EXPOSURE: the pause is applied only while a segment is in
its open-ended `INITIAL_CONSUMING`
+/// phase. Segments catching up to a target offset (`CATCHING_UP` /
`CONSUMING_TO_ONLINE`, e.g. right after a server
+/// restart or during lag recovery) are intentionally NOT paused, because
stalling them would wedge the Helix
+/// CONSUMING -> ONLINE state transition (`CATCHING_UP` has no time bound).
During catch-up the on-heap upsert/dedup
+/// metadata can still grow, so this guard dampens and delays OOM rather than
guaranteeing prevention; it buys time for
+/// GC, segment commits, and TTL eviction. See
[RealtimeSegmentDataManager#consumeLoop].
+///
+/// The [Mode] controls which tables are guarded ([Mode#ALL],
[Mode#UPSERT_DEDUP_ONLY], or [Mode#DISABLED]). To avoid
+/// wedging ingestion if the sampler thread ever dies, the trigger fails open:
if the last sample is older than a
+/// staleness threshold, [#shouldPauseConsumption] ignores the (possibly
stuck) `_heapPressure` flag.
+///
+/// This class is a process-wide singleton (mirroring
[RealtimeConsumptionRateManager]); it is initialized once from
+/// server config at startup via [#init] and read directly by
[RealtimeSegmentDataManager]. It is thread-safe.
+public class RealtimeIngestionMemoryGuard {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeIngestionMemoryGuard.class);
+
+ /// Which realtime tables the guard applies to.
+ public enum Mode {
+ /// Pause consumption for any realtime table under memory pressure.
+ ALL,
+ /// Pause consumption only for upsert/dedup tables (whose on-heap metadata
is the main OOM driver).
+ UPSERT_DEDUP_ONLY,
+ /// Guard is turned off; consumption is never paused.
+ DISABLED
+ }
+
+ private static final RealtimeIngestionMemoryGuard INSTANCE = new
RealtimeIngestionMemoryGuard();
+
+ public static RealtimeIngestionMemoryGuard getInstance() {
+ return INSTANCE;
+ }
+
+ private final LongSupplier _usedHeapSupplier;
+ private final LongSupplier _maxHeapSupplier;
+ private final LongSupplier _clockMs;
+
+ // Configuration, set on init() and treated as effectively immutable
afterwards.
+ private volatile Mode _mode = Mode.DISABLED;
+ private volatile double _pauseRatio =
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO;
+ private volatile double _resumeRatio =
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO;
+ private volatile long _checkIntervalMs =
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS;
+ private volatile long _staleThresholdMs = 5 *
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS;
+
+ // Runtime state.
+ private volatile boolean _heapPressure = false;
+ private volatile long _lastSampleTimeMs = 0;
+ @Nullable
+ private volatile ServerMetrics _serverMetrics;
+ private boolean _initialized = false;
+
+ private RealtimeIngestionMemoryGuard() {
+ this(ResourceUsageUtils::getUsedHeapSize,
ResourceUsageUtils::getMaxHeapSize, System::currentTimeMillis);
+ }
+
+ @VisibleForTesting
+ RealtimeIngestionMemoryGuard(LongSupplier usedHeapSupplier, LongSupplier
maxHeapSupplier, LongSupplier clockMs) {
+ _usedHeapSupplier = usedHeapSupplier;
+ _maxHeapSupplier = maxHeapSupplier;
+ _clockMs = clockMs;
+ }
+
+ /// Initializes the guard from server config and, unless the mode is
[Mode#DISABLED], starts the heap sampler thread.
+ /// Safe to call once at server startup; subsequent calls are ignored (the
guard is configured once per process).
+ public synchronized void init(PinotConfiguration serverConfig, ServerMetrics
serverMetrics) {
+ if (_initialized) {
+ LOGGER.warn("RealtimeIngestionMemoryGuard already initialized, ignoring
re-init");
+ return;
+ }
+ _initialized = true;
+ _serverMetrics = serverMetrics;
+ applyConfig(serverConfig);
+
+ if (_mode == Mode.DISABLED) {
+ LOGGER.info("RealtimeIngestionMemoryGuard is disabled");
+ return;
+ }
+ LOGGER.info("Starting RealtimeIngestionMemoryGuard: mode={},
pauseHeapRatio={}, resumeHeapRatio={}, "
+ + "checkIntervalMs={}, maxHeapBytes={}", _mode, _pauseRatio,
_resumeRatio, _checkIntervalMs,
+ _maxHeapSupplier.getAsLong());
+ // Daemon sampler: dies on JVM exit, so no explicit shutdown is needed
(mirrors RealtimeConsumptionRateManager).
+ Thread samplerThread = new Thread(this::samplerLoop,
"RealtimeIngestionMemoryGuard");
+ samplerThread.setDaemon(true);
+ samplerThread.start();
+ }
+
+ /// Parses and validates the guard configuration into the in-memory fields,
without starting the sampler thread.
+ /// Visible for testing so the decision logic can be configured
deterministically.
+ @VisibleForTesting
+ void applyConfig(PinotConfiguration serverConfig) {
+ _mode =
parseMode(serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE));
+
+ double pauseRatio =
serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO);
+ double resumeRatio =
serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO);
+ // Require 0 < resumeRatio < pauseRatio <= 1.0 so the hysteresis band is
well-formed; otherwise reject both and
+ // fall back to the defaults rather than silently consuming bad thresholds.
+ if (!(pauseRatio > 0.0 && pauseRatio <= 1.0 && resumeRatio > 0.0 &&
resumeRatio < pauseRatio)) {
+ LOGGER.warn("Invalid heap usage ratios (pause={}, resume={}); falling
back to defaults (pause={}, resume={})",
+ pauseRatio, resumeRatio,
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO);
+ pauseRatio = Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO;
+ resumeRatio = Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO;
+ }
+ _pauseRatio = pauseRatio;
+ _resumeRatio = resumeRatio;
+
+ long checkIntervalMs =
serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS);
+ if (checkIntervalMs <= 0) {
+ LOGGER.warn("Invalid check interval {}ms; falling back to default {}ms",
checkIntervalMs,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS);
+ checkIntervalMs =
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS;
+ }
+ _checkIntervalMs = checkIntervalMs;
+ // Treat the heap-pressure flag as stale (and so fail open) if the sampler
has not refreshed it for several check
+ // intervals, which indicates the sampler thread has died.
+ _staleThresholdMs = 5 * checkIntervalMs;
+ }
+
+ private static Mode parseMode(String modeStr) {
+ try {
+ return Mode.valueOf(modeStr.trim().toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("Invalid {} value '{}'; falling back to default '{}'",
+ Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE, modeStr,
+ Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE);
+ return Mode.valueOf(Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE);
+ }
+ }
+
+ private void samplerLoop() {
+ while (true) {
+ try {
+ sampleOnce();
+ } catch (Throwable t) {
+ LOGGER.warn("Error while sampling heap usage in
RealtimeIngestionMemoryGuard", t);
+ }
+ try {
+ Thread.sleep(_checkIntervalMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ /// Samples heap usage once and updates the [#_heapPressure] flag using
hysteresis. Visible for testing so the
+ /// decision logic can be driven deterministically without the sampler
thread.
+ @VisibleForTesting
+ void sampleOnce() {
+ long maxHeap = _maxHeapSupplier.getAsLong();
+ long usedHeap = _usedHeapSupplier.getAsLong();
+ if (maxHeap <= 0) {
+ // Cannot compute a ratio without a heap max; leave the pressure state
and heartbeat untouched so that, if this
+ // persists, the staleness check in shouldPauseConsumption() fails open
rather than honoring a stuck flag.
+ return;
+ }
+ // Refresh the heartbeat only after a usable sample so a live-but-blind
sampler cannot keep a stuck flag "fresh".
+ _lastSampleTimeMs = _clockMs.getAsLong();
+ double ratio = (double) usedHeap / maxHeap;
+ boolean previous = _heapPressure;
+ boolean next = previous;
+ if (!previous && ratio >= _pauseRatio) {
+ next = true;
+ } else if (previous && ratio <= _resumeRatio) {
+ next = false;
+ }
+ if (next != previous) {
+ _heapPressure = next;
+ ServerMetrics serverMetrics = _serverMetrics;
+ if (serverMetrics != null) {
+
serverMetrics.setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_MEMORY_PAUSED,
next ? 1L : 0L);
+ }
+ if (next) {
+ LOGGER.warn("Pausing realtime ingestion due to heap pressure:
usedHeap={} bytes, maxHeap={} bytes, "
+ + "ratio={} >= pauseRatio={}", usedHeap, maxHeap, ratio,
_pauseRatio);
+ } else {
+ LOGGER.info("Resuming realtime ingestion: usedHeap={} bytes,
maxHeap={} bytes, ratio={} <= resumeRatio={}",
+ usedHeap, maxHeap, ratio, _resumeRatio);
+ }
Review Comment:
The gauge/log messages currently say ingestion is "paused/resumed" purely
when the heap-pressure flag flips. In `UPSERT_DEDUP_ONLY` mode that flag can be
true even if a server is hosting only non-upsert/dedup realtime tables (so
nothing is actually paused). Consider wording these as "heap-pressure trigger
active/cleared" (the guard will pause eligible consumers) to avoid misleading
operators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]