Copilot commented on code in PR #18785:
URL: https://github.com/apache/pinot/pull/18785#discussion_r3425345261


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -483,7 +487,37 @@ protected boolean consumeLoop()
         .create(_currentOffset);  // so that we always update the metric when 
we enter this method.
 
     _segmentLogger.info("Starting consumption loop start offset {}, 
finalOffset {}", _currentOffset, _finalOffset);
+    boolean wasPausedForMemory = false;
     while (!_shouldStop && !endCriteriaReached()) {
+      // Server memory guard: while the server is under heap pressure, park 
this consumer instead of fetching more
+      // messages, so memory stops growing; resume automatically once the
+      // pressure clears. Only applies in INITIAL_CONSUMING - the catch-up 
states (CATCHING_UP / CONSUMING_TO_ONLINE)
+      // must keep making progress to complete the Helix transition. The sleep 
sits above the fetchMessages try-block
+      // so that an interrupt from stop() terminates the thread promptly 
instead of being misclassified as a transient
+      // stream error. endCriteriaReached() is still evaluated every 
iteration, so force-commit, time-limit and
+      // end-of-partition all continue to take precedence over the pause.
+      if (_state == State.INITIAL_CONSUMING && 
_ingestionMemoryGuard.shouldPauseConsumption(
+          _partitionUpsertMetadataManager, _partitionDedupMetadataManager)) {
+        if (!wasPausedForMemory) {
+          _segmentLogger.warn("Pausing consumption due to server memory 
pressure");
+          wasPausedForMemory = true;
+        }
+        _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_CONSUMPTION_PAUSED_MEMORY, 1L);

Review Comment:
   When the memory guard pauses consumption on the first iteration (heap 
pressure already true), this loop `continue`s before setting 
`LLC_PARTITION_CONSUMING` to 1. That can leave the gauge at 0 for the whole 
pause window, making monitoring think the partition is not consuming even 
though the consumer thread is running/parked.



##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -51,6 +51,10 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_CLP_UNENCODABLE("rows", false),
   REALTIME_CLP_ENCODED_NON_STRINGS("rows", false),
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
+  // Incremented once per pause check-interval that a consuming segment is 
parked by the server-local memory guard
+  // (RealtimeIngestionMemoryGuard) due to heap pressure or a primary-key cap. 
Reported per table so operators can see
+  // which tables are being throttled.

Review Comment:
   The comment mentions pausing due to a "primary-key cap", but the current 
guard implementation only pauses based on heap-pressure hysteresis (no 
primary-key cap config/trigger exists in this PR). This makes the metric 
documentation misleading.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1546,6 +1546,29 @@ public static class Server {
     // Default to 0.0 (no limit)
     public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
 
+    // Configs for the server-local realtime ingestion memory guard 
(RealtimeIngestionMemoryGuard). When the JVM heap
+    // crosses the pause threshold, realtime consumers are parked (stop 
fetching from the stream) until heap recovers
+    // below the resume threshold, protecting the server from OOM (primarily 
driven by on-heap upsert/dedup primary-key
+    // metadata). It is server-local and self-healing.
+    // Mode controls which tables are guarded: ALL | UPSERT_DEDUP_ONLY | 
DISABLED. On by default, scoped to the
+    // upsert/dedup tables whose on-heap metadata is the main OOM driver.
+    public static final String CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE =
+        "pinot.server.consumption.memory.pause.mode";
+    public static final String DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE = 
"UPSERT_DEDUP_ONLY";

Review Comment:
   The PR description/config table calls out an optional 
`pinot.server.consumption.memory.pause.primary.key.cap` trigger, but there is 
no corresponding constant here and no primary-key-cap logic anywhere in the 
code (also no `getNumberOfPrimaryKeys()` API change referenced in the 
description). Either implement the cap trigger end-to-end (config constant + 
guard logic + tests) or update the PR description/metrics comments to match the 
shipped behavior.



##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -43,6 +43,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
   // Gauge to reflect whether pauseless is enabled or not
   PAUSELESS_CONSUMPTION_ENABLED("pauselessConsumptionEnabled", false),
+  // Server-global gauge (0/1) reflecting whether realtime ingestion is 
currently paused on this server due to JVM heap
+  // pressure (see RealtimeIngestionMemoryGuard). Lets dashboards/alerts 
distinguish a deliberate memory pause from a
+  // wedged consumer.

Review Comment:
   The gauge description says realtime ingestion is "currently paused". Since 
the guard updates this gauge when heap-pressure flips (not when any specific 
table is actually parked), it is more accurate to describe it as the 
memory-guard heap-pressure trigger being active (which will pause *eligible* 
consumers based on mode).



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeIngestionMemoryGuard.java:
##########
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Locale;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
+import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Server-local guard that pauses realtime ingestion when the JVM is under 
memory pressure, to protect the server from
+/// OOM. The dominant driver this targets is the on-heap upsert/dedup 
primary-key metadata, which grows with ingestion
+/// and is not freed by committing a consuming segment.
+///
+/// A single daemon thread samples the JVM heap every `checkIntervalMs` and 
maintains a `_heapPressure` flag with
+/// hysteresis: it flips to `true` once `usedHeap / maxHeap >= pauseRatio` and 
back to `false` once
+/// `usedHeap / maxHeap <= resumeRatio`. Realtime consumers consult 
[#shouldPauseConsumption] inside their consume loop
+/// and park (stop fetching from the stream) while it returns `true`, so 
memory stops growing; they resume
+/// automatically when heap recovers. This is server-local and self-healing — 
it does not involve the controller or
+/// ZooKeeper.
+///
+/// SCOPE / RESIDUAL EXPOSURE: the pause is applied only while a segment is in 
its open-ended `INITIAL_CONSUMING`
+/// phase. Segments catching up to a target offset (`CATCHING_UP` / 
`CONSUMING_TO_ONLINE`, e.g. right after a server
+/// restart or during lag recovery) are intentionally NOT paused, because 
stalling them would wedge the Helix
+/// CONSUMING -> ONLINE state transition (`CATCHING_UP` has no time bound). 
During catch-up the on-heap upsert/dedup
+/// metadata can still grow, so this guard dampens and delays OOM rather than 
guaranteeing prevention; it buys time for
+/// GC, segment commits, and TTL eviction. See 
[RealtimeSegmentDataManager#consumeLoop].
+///
+/// The [Mode] controls which tables are guarded ([Mode#ALL], 
[Mode#UPSERT_DEDUP_ONLY], or [Mode#DISABLED]). To avoid
+/// wedging ingestion if the sampler thread ever dies, the trigger fails open: 
if the last sample is older than a
+/// staleness threshold, [#shouldPauseConsumption] ignores the (possibly 
stuck) `_heapPressure` flag.
+///
+/// This class is a process-wide singleton (mirroring 
[RealtimeConsumptionRateManager]); it is initialized once from
+/// server config at startup via [#init] and read directly by 
[RealtimeSegmentDataManager]. It is thread-safe.
+public class RealtimeIngestionMemoryGuard {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeIngestionMemoryGuard.class);
+
+  /// Which realtime tables the guard applies to.
+  public enum Mode {
+    /// Pause consumption for any realtime table under memory pressure.
+    ALL,
+    /// Pause consumption only for upsert/dedup tables (whose on-heap metadata 
is the main OOM driver).
+    UPSERT_DEDUP_ONLY,
+    /// Guard is turned off; consumption is never paused.
+    DISABLED
+  }
+
+  private static final RealtimeIngestionMemoryGuard INSTANCE = new 
RealtimeIngestionMemoryGuard();
+
+  public static RealtimeIngestionMemoryGuard getInstance() {
+    return INSTANCE;
+  }
+
+  private final LongSupplier _usedHeapSupplier;
+  private final LongSupplier _maxHeapSupplier;
+  private final LongSupplier _clockMs;
+
+  // Configuration, set on init() and treated as effectively immutable 
afterwards.
+  private volatile Mode _mode = Mode.DISABLED;
+  private volatile double _pauseRatio = 
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO;
+  private volatile double _resumeRatio = 
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO;
+  private volatile long _checkIntervalMs = 
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS;
+  private volatile long _staleThresholdMs = 5 * 
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS;
+
+  // Runtime state.
+  private volatile boolean _heapPressure = false;
+  private volatile long _lastSampleTimeMs = 0;
+  @Nullable
+  private volatile ServerMetrics _serverMetrics;
+  private boolean _initialized = false;
+
+  private RealtimeIngestionMemoryGuard() {
+    this(ResourceUsageUtils::getUsedHeapSize, 
ResourceUsageUtils::getMaxHeapSize, System::currentTimeMillis);
+  }
+
+  @VisibleForTesting
+  RealtimeIngestionMemoryGuard(LongSupplier usedHeapSupplier, LongSupplier 
maxHeapSupplier, LongSupplier clockMs) {
+    _usedHeapSupplier = usedHeapSupplier;
+    _maxHeapSupplier = maxHeapSupplier;
+    _clockMs = clockMs;
+  }
+
+  /// Initializes the guard from server config and, unless the mode is 
[Mode#DISABLED], starts the heap sampler thread.
+  /// Safe to call once at server startup; subsequent calls are ignored (the 
guard is configured once per process).
+  public synchronized void init(PinotConfiguration serverConfig, ServerMetrics 
serverMetrics) {
+    if (_initialized) {
+      LOGGER.warn("RealtimeIngestionMemoryGuard already initialized, ignoring 
re-init");
+      return;
+    }
+    _initialized = true;
+    _serverMetrics = serverMetrics;
+    applyConfig(serverConfig);
+
+    if (_mode == Mode.DISABLED) {
+      LOGGER.info("RealtimeIngestionMemoryGuard is disabled");
+      return;
+    }
+    LOGGER.info("Starting RealtimeIngestionMemoryGuard: mode={}, 
pauseHeapRatio={}, resumeHeapRatio={}, "
+            + "checkIntervalMs={}, maxHeapBytes={}", _mode, _pauseRatio, 
_resumeRatio, _checkIntervalMs,
+        _maxHeapSupplier.getAsLong());
+    // Daemon sampler: dies on JVM exit, so no explicit shutdown is needed 
(mirrors RealtimeConsumptionRateManager).
+    Thread samplerThread = new Thread(this::samplerLoop, 
"RealtimeIngestionMemoryGuard");
+    samplerThread.setDaemon(true);
+    samplerThread.start();
+  }
+
+  /// Parses and validates the guard configuration into the in-memory fields, 
without starting the sampler thread.
+  /// Visible for testing so the decision logic can be configured 
deterministically.
+  @VisibleForTesting
+  void applyConfig(PinotConfiguration serverConfig) {
+    _mode = 
parseMode(serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE,
+        Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE));
+
+    double pauseRatio = 
serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO,
+        Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO);
+    double resumeRatio = 
serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO,
+        Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO);
+    // Require 0 < resumeRatio < pauseRatio <= 1.0 so the hysteresis band is 
well-formed; otherwise reject both and
+    // fall back to the defaults rather than silently consuming bad thresholds.
+    if (!(pauseRatio > 0.0 && pauseRatio <= 1.0 && resumeRatio > 0.0 && 
resumeRatio < pauseRatio)) {
+      LOGGER.warn("Invalid heap usage ratios (pause={}, resume={}); falling 
back to defaults (pause={}, resume={})",
+          pauseRatio, resumeRatio, 
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO,
+          Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO);
+      pauseRatio = Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_HEAP_RATIO;
+      resumeRatio = Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_RESUME_HEAP_RATIO;
+    }
+    _pauseRatio = pauseRatio;
+    _resumeRatio = resumeRatio;
+
+    long checkIntervalMs = 
serverConfig.getProperty(Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS,
+        Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS);
+    if (checkIntervalMs <= 0) {
+      LOGGER.warn("Invalid check interval {}ms; falling back to default {}ms", 
checkIntervalMs,
+          Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS);
+      checkIntervalMs = 
Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_CHECK_INTERVAL_MS;
+    }
+    _checkIntervalMs = checkIntervalMs;
+    // Treat the heap-pressure flag as stale (and so fail open) if the sampler 
has not refreshed it for several check
+    // intervals, which indicates the sampler thread has died.
+    _staleThresholdMs = 5 * checkIntervalMs;
+  }
+
+  private static Mode parseMode(String modeStr) {
+    try {
+      return Mode.valueOf(modeStr.trim().toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException e) {
+      LOGGER.warn("Invalid {} value '{}'; falling back to default '{}'",
+          Server.CONFIG_OF_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE, modeStr,
+          Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE);
+      return Mode.valueOf(Server.DEFAULT_SERVER_CONSUMPTION_MEMORY_PAUSE_MODE);
+    }
+  }
+
+  private void samplerLoop() {
+    while (true) {
+      try {
+        sampleOnce();
+      } catch (Throwable t) {
+        LOGGER.warn("Error while sampling heap usage in 
RealtimeIngestionMemoryGuard", t);
+      }
+      try {
+        Thread.sleep(_checkIntervalMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return;
+      }
+    }
+  }
+
+  /// Samples heap usage once and updates the [#_heapPressure] flag using 
hysteresis. Visible for testing so the
+  /// decision logic can be driven deterministically without the sampler 
thread.
+  @VisibleForTesting
+  void sampleOnce() {
+    long maxHeap = _maxHeapSupplier.getAsLong();
+    long usedHeap = _usedHeapSupplier.getAsLong();
+    if (maxHeap <= 0) {
+      // Cannot compute a ratio without a heap max; leave the pressure state 
and heartbeat untouched so that, if this
+      // persists, the staleness check in shouldPauseConsumption() fails open 
rather than honoring a stuck flag.
+      return;
+    }
+    // Refresh the heartbeat only after a usable sample so a live-but-blind 
sampler cannot keep a stuck flag "fresh".
+    _lastSampleTimeMs = _clockMs.getAsLong();
+    double ratio = (double) usedHeap / maxHeap;
+    boolean previous = _heapPressure;
+    boolean next = previous;
+    if (!previous && ratio >= _pauseRatio) {
+      next = true;
+    } else if (previous && ratio <= _resumeRatio) {
+      next = false;
+    }
+    if (next != previous) {
+      _heapPressure = next;
+      ServerMetrics serverMetrics = _serverMetrics;
+      if (serverMetrics != null) {
+        
serverMetrics.setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_MEMORY_PAUSED,
 next ? 1L : 0L);
+      }
+      if (next) {
+        LOGGER.warn("Pausing realtime ingestion due to heap pressure: 
usedHeap={} bytes, maxHeap={} bytes, "
+            + "ratio={} >= pauseRatio={}", usedHeap, maxHeap, ratio, 
_pauseRatio);
+      } else {
+        LOGGER.info("Resuming realtime ingestion: usedHeap={} bytes, 
maxHeap={} bytes, ratio={} <= resumeRatio={}",
+            usedHeap, maxHeap, ratio, _resumeRatio);
+      }

Review Comment:
   The gauge/log messages currently say ingestion is "paused/resumed" purely 
when the heap-pressure flag flips. In `UPSERT_DEDUP_ONLY` mode that flag can be 
true even if a server is hosting only non-upsert/dedup realtime tables (so 
nothing is actually paused). Consider wording these as "heap-pressure trigger 
active/cleared" (the guard will pause eligible consumers) to avoid misleading 
operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to