This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ff7df4893c8 Add server ingestion OOM protection (#18784)
ff7df4893c8 is described below

commit ff7df4893c898251c42911902aec31953638986b
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jun 19 21:15:11 2026 -0700

    Add server ingestion OOM protection (#18784)
---
 .../apache/pinot/common/metrics/ServerGauge.java   |   2 +
 .../utils/config/TableConfigSerDeUtilsTest.java    |   3 +
 .../provider/DefaultTableDataManagerProvider.java  |   4 +-
 .../manager/provider/TableDataManagerProvider.java |   7 +-
 .../realtime/RealtimeSegmentDataManager.java       |  26 ++
 .../manager/realtime/RealtimeTableDataManager.java |  18 +-
 .../ServerIngestionOomProtectionManager.java       | 373 ++++++++++++++++
 .../realtime/RealtimeSegmentDataManagerTest.java   | 124 ++++++
 .../ServerIngestionOomProtectionManagerTest.java   | 492 +++++++++++++++++++++
 .../FailureInjectingRealtimeTableDataManager.java  |  10 +
 .../FailureInjectingTableDataManagerProvider.java  |   6 +-
 .../segment/local/utils/TableConfigUtils.java      |   5 +-
 .../segment/local/utils/TableConfigUtilsTest.java  |  25 ++
 .../server/starter/helix/BaseServerStarter.java    |   4 +
 .../starter/helix/HelixInstanceDataManager.java    |  11 +-
 .../table/ingestion/StreamIngestionConfig.java     |  14 +
 .../apache/pinot/spi/utils/CommonConstants.java    |  18 +
 .../examples/stream/upsertMeetupRsvp/README.md     |  66 +++
 .../upsertMeetupRsvp_realtime_table_config.json    |   3 +-
 19 files changed, 1201 insertions(+), 10 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index b4f6ae66f4a..46df92b2714 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -94,6 +94,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   UPSERT_QUERYABLE_DOCS_IN_SNAPSHOT_COUNT("upsertQueryableDocIdsInSnapshot", 
false),
   REALTIME_INGESTION_OFFSET_LAG("offsetLag", false,
       "The difference between latest message offset and the last consumed 
message offset."),
+  REALTIME_INGESTION_OOM_PROTECTION_ACTIVE("boolean", true,
+      "Binary indicator (1 or 0) for whether the server-wide realtime 
ingestion OOM throttle is active."),
   REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false, "The offset of 
the latest message in the upstream."),
   REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false, "The offset of 
the last consumed message."),
   REALTIME_INGESTION_DELAY_MS("milliseconds", false,
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
index 796f26b02db..fd2227f3896 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
@@ -58,6 +58,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPol
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
@@ -307,6 +308,7 @@ public class TableConfigSerDeUtilsTest {
           new 
StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType",
 "kafka")));
       streamIngestionConfig.setParallelSegmentConsumptionPolicy(
           ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY);
+      streamIngestionConfig.setOomProtection(Enablement.ENABLE);
       ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
       ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
       ingestionConfig.setTransformConfigs(
@@ -499,6 +501,7 @@ public class TableConfigSerDeUtilsTest {
     
assertEquals(ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().size(),
 1);
     
assertEquals(ingestionConfig.getStreamIngestionConfig().getParallelSegmentConsumptionPolicy(),
         ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY);
+    
assertEquals(ingestionConfig.getStreamIngestionConfig().getOomProtection(), 
Enablement.ENABLE);
   }
 
   private void checkTierConfigList(TableConfig tableConfig) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 57cedd30812..add13516256 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -76,6 +77,7 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       BooleanSupplier isServerReadyToConsumeData, BooleanSupplier 
isServerReadyToServeQueries,
+      ServerIngestionOomProtectionManager.ServerThrottleState 
serverIngestionOomProtectionThrottleState,
       boolean enableAsyncSegmentRefresh, ServerReloadJobStatusCache 
reloadJobStatusCache) {
     TableDataManager tableDataManager;
     switch (tableConfig.getTableType()) {
@@ -95,7 +97,7 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
               StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
         }
         tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToConsumeData,
-            isServerReadyToServeQueries);
+            isServerReadyToServeQueries, 
serverIngestionOomProtectionThrottleState);
         break;
       default:
         throw new IllegalStateException();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index b0e4d11e572..023430eccad 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -26,7 +26,9 @@ import java.util.function.BooleanSupplier;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import 
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -56,6 +58,7 @@ public interface TableDataManagerProvider {
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       BooleanSupplier isServerReadyToConsumeData,
       BooleanSupplier isServerReadyToServeQueries,
+      ServerIngestionOomProtectionManager.ServerThrottleState 
serverIngestionOomProtectionThrottleState,
       boolean enableAsyncSegmentRefresh,
       ServerReloadJobStatusCache reloadJobStatusCache);
 
@@ -65,6 +68,8 @@ public interface TableDataManagerProvider {
   @VisibleForTesting
   default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
     return getTableDataManager(tableConfig, schema, new 
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
-        null, null, () -> true, () -> true, false, new 
ServerReloadJobStatusCache("testInstance"));
+        null, null, () -> true, () -> true,
+        ServerIngestionOomProtectionManager.createServerThrottleState(null, 
ServerMetrics.get()), false,
+        new ServerReloadJobStatusCache("testInstance"));
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d918355b490..6246ff58570 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -268,6 +268,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   private final BooleanSupplier _isReadyToConsumeData;
+  private ServerIngestionOomProtectionManager 
_serverIngestionOomProtectionManager;
   private final MutableSegmentImpl _realtimeSegment;
   private volatile StreamPartitionMsgOffset _currentOffset; // Next offset to 
be consumed
   private volatile State _state;
@@ -445,6 +446,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     }
   }
 
+  private boolean shouldStopWaitingForOomProtection() {
+    if (_shouldStop || _state != State.INITIAL_CONSUMING) {
+      return true;
+    }
+    return now() >= _consumeEndTime || _numRowsIndexed >= _segmentMaxRowCount 
|| _endOfPartitionGroup
+        || _forceCommitMessageReceived || !canAddMore();
+  }
+
   private void handleTransientStreamErrors(Exception e)
       throws Exception {
     _consecutiveErrorCount++;
@@ -484,6 +493,16 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
     _segmentLogger.info("Starting consumption loop start offset {}, 
finalOffset {}", _currentOffset, _finalOffset);
     while (!_shouldStop && !endCriteriaReached()) {
+      if (_state == State.INITIAL_CONSUMING && 
_serverIngestionOomProtectionManager != null) {
+        boolean waitedForOomProtection =
+            
_serverIngestionOomProtectionManager.waitIfProtectionNeeded(this::shouldStopWaitingForOomProtection);
+        if (_shouldStop || endCriteriaReached()) {
+          break;
+        }
+        if (waitedForOomProtection) {
+          _idleTimer.markStreamCreated();
+        }
+      }
       _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
       // Consume for the next readTime ms, or we get to final offset, 
whichever happens earlier,
       // Update _currentOffset upon return from this method
@@ -763,6 +782,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return _realtimeSegment.canAddMore();
   }
 
+  @VisibleForTesting
+  void setServerIngestionOomProtectionManager(
+      ServerIngestionOomProtectionManager serverIngestionOomProtectionManager) 
{
+    _serverIngestionOomProtectionManager = serverIngestionOomProtectionManager;
+  }
+
   public class PartitionConsumer implements Runnable {
     public void run() {
       long initialConsumptionEnd = 0L;
@@ -1743,6 +1768,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _partitionDedupMetadataManager = partitionDedupMetadataManager;
     _isReadyToConsumeData = isReadyToConsumeData;
+    _serverIngestionOomProtectionManager = 
realtimeTableDataManager.getServerIngestionOomProtectionManager();
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getInstanceId();
     _leaseExtender = 
SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 15ec71fe50f..6cdfd08b2e1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -50,6 +50,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentUtils;
@@ -139,12 +140,14 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   private final BooleanSupplier _isServerReadyToConsumeData;
   private final BooleanSupplier _isServerReadyToServeQueries;
+  private final ServerIngestionOomProtectionManager.ServerThrottleState 
_serverIngestionOomProtectionThrottleState;
 
   // Object to track ingestion delay for all partitions
   private IngestionDelayTracker _ingestionDelayTracker;
 
   private TableDedupMetadataManager _tableDedupMetadataManager;
   private BooleanSupplier _isTableReadyToConsumeData;
+  private ServerIngestionOomProtectionManager 
_serverIngestionOomProtectionManager;
   private boolean _enforceConsumptionInOrder = false;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
@@ -152,7 +155,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
BooleanSupplier isServerReadyToServeQueries) {
-    this(segmentBuildSemaphore, () -> true, isServerReadyToServeQueries);
+    this(segmentBuildSemaphore, () -> true, isServerReadyToServeQueries,
+        // Test/legacy: per-instance non-shared unregistered throttle; ignores 
cluster config; prod uses startup state.
+        ServerIngestionOomProtectionManager.createServerThrottleState(null, 
ServerMetrics.get()));
   }
 
   /**
@@ -161,10 +166,12 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    *     entry of its consumer thread; once the gate clears, it is not 
consulted again for that segment.
    */
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
BooleanSupplier isServerReadyToConsumeData,
-      BooleanSupplier isServerReadyToServeQueries) {
+      BooleanSupplier isServerReadyToServeQueries,
+      ServerIngestionOomProtectionManager.ServerThrottleState 
serverIngestionOomProtectionThrottleState) {
     _segmentBuildSemaphore = segmentBuildSemaphore;
     _isServerReadyToConsumeData = isServerReadyToConsumeData;
     _isServerReadyToServeQueries = isServerReadyToServeQueries;
+    _serverIngestionOomProtectionThrottleState = 
serverIngestionOomProtectionThrottleState;
   }
 
   @Override
@@ -264,6 +271,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
     _isTableReadyToConsumeData = () -> 
readyForDedupOrPartialUpsert.getAsBoolean()
         && _isServerReadyToConsumeData.getAsBoolean();
+    _serverIngestionOomProtectionManager = new 
ServerIngestionOomProtectionManager(
+        () -> getCachedTableConfigAndSchema().getLeft(), () -> 
isUpsertEnabled() || isDedupEnabled(),
+        _serverIngestionOomProtectionThrottleState);
   }
 
   @VisibleForTesting
@@ -859,6 +869,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return _isServerReadyToServeQueries;
   }
 
+  ServerIngestionOomProtectionManager getServerIngestionOomProtectionManager() 
{
+    return _serverIngestionOomProtectionManager;
+  }
+
   /**
    * Validate a schema against the table config for real-time record 
consumption.
    * Ideally, we should validate these things when schema is added or table is 
created, but either of these
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java
new file mode 100644
index 00000000000..1a0b9845df5
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Applies server-local backpressure to realtime ingestion while JVM heap 
usage is above a configured threshold.
+public class ServerIngestionOomProtectionManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerIngestionOomProtectionManager.class);
+  private static final String SERVER_INSTANCE_CONFIG_PREFIX =
+      CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".";
+  private static final Set<String> SERVER_INGESTION_OOM_PROTECTION_CONFIG_KEYS 
= Set.of(
+      CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE,
+      
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD,
+      
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD,
+      
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
+      
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS);
+
+  // Server mode has the UPSERT_DEDUP_ONLY policy; table-level override 
intentionally only supports ENABLE/DISABLE.
+  enum ServerMode {
+    ENABLE, UPSERT_DEDUP_ONLY, DISABLE
+  }
+
+  private static final ServerMode DEFAULT_SERVER_MODE =
+      
ServerMode.valueOf(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_MODE);
+
+  private final Supplier<TableConfig> _tableConfigSupplier;
+  private final BooleanSupplier _isUpsertOrDedupEnabledSupplier;
+  private final ServerThrottleState _serverThrottleState;
+
+  ServerIngestionOomProtectionManager(Supplier<TableConfig> 
tableConfigSupplier,
+      BooleanSupplier isUpsertOrDedupEnabledSupplier,
+      ServerThrottleState serverThrottleState) {
+    _tableConfigSupplier = tableConfigSupplier;
+    _isUpsertOrDedupEnabledSupplier = isUpsertOrDedupEnabledSupplier;
+    _serverThrottleState = serverThrottleState;
+  }
+
+  public boolean waitIfProtectionNeeded(BooleanSupplier stopCondition)
+      throws InterruptedException {
+    boolean waited = false;
+    while (!stopCondition.getAsBoolean() && shouldThrottle()) {
+      waited = true;
+      Thread.sleep(_serverThrottleState.getCheckIntervalMs());
+    }
+    return waited;
+  }
+
+  @VisibleForTesting
+  boolean shouldThrottle() {
+    return isEnabledForTable(_tableConfigSupplier.get()) && 
_serverThrottleState.shouldThrottle();
+  }
+
+  @VisibleForTesting
+  boolean isThrottling() {
+    return _serverThrottleState.isThrottling();
+  }
+
+  @VisibleForTesting
+  void resetMetrics() {
+    _serverThrottleState.resetMetrics();
+  }
+
+  private boolean isEnabledForTable(@Nullable TableConfig tableConfig) {
+    if (tableConfig == null || 
!TableNameBuilder.isRealtimeTableResource(tableConfig.getTableName())) {
+      return false;
+    }
+    return getTableEnablement(tableConfig).isEnabled(() -> switch 
(_serverThrottleState.getMode()) {
+      case ENABLE -> true;
+      case UPSERT_DEDUP_ONLY -> _isUpsertOrDedupEnabledSupplier.getAsBoolean();
+      case DISABLE -> false;
+    });
+  }
+
+  private static Enablement getTableEnablement(@Nullable TableConfig 
tableConfig) {
+    if (tableConfig == null) {
+      return Enablement.DEFAULT;
+    }
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    if (ingestionConfig == null) {
+      return Enablement.DEFAULT;
+    }
+    StreamIngestionConfig streamIngestionConfig = 
ingestionConfig.getStreamIngestionConfig();
+    return streamIngestionConfig != null ? 
streamIngestionConfig.getOomProtection() : Enablement.DEFAULT;
+  }
+
+  public static ServerThrottleState createServerThrottleState(
+      @Nullable PinotConfiguration instanceDataManagerConfig, ServerMetrics 
serverMetrics) {
+    return new ServerThrottleState(instanceDataManagerConfig, serverMetrics, 
ResourceUsageUtils::getUsedHeapSize,
+        ResourceUsageUtils::getMaxHeapSize, System::currentTimeMillis, 
System::gc);
+  }
+
+  private static boolean containsIngestionOomProtectionConfig(Set<String> 
configKeys) {
+    for (String key : configKeys) {
+      String unprefixedKey = key.startsWith(SERVER_INSTANCE_CONFIG_PREFIX)
+          ? key.substring(SERVER_INSTANCE_CONFIG_PREFIX.length())
+          : key;
+      if (SERVER_INGESTION_OOM_PROTECTION_CONFIG_KEYS.contains(unprefixedKey)) 
{
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static ServerMode getMode(String rawMode) {
+    if (rawMode == null) {
+      LOGGER.warn("Invalid server ingestion OOM protection mode: null. Falling 
back to: {}",
+          DEFAULT_SERVER_MODE);
+      return DEFAULT_SERVER_MODE;
+    }
+    try {
+      return ServerMode.valueOf(rawMode.toUpperCase(Locale.ROOT));
+    } catch (IllegalArgumentException e) {
+      LOGGER.warn("Invalid server ingestion OOM protection mode: {}. Falling 
back to: {}",
+          rawMode, DEFAULT_SERVER_MODE);
+      return DEFAULT_SERVER_MODE;
+    }
+  }
+
+  /// Server-wide realtime ingestion OOM protection state shared by all 
realtime table managers in one server process.
+  public static class ServerThrottleState implements 
PinotClusterConfigChangeListener {
+    private final ServerMetrics _serverMetrics;
+    private final PinotConfiguration _instanceDataManagerConfig;
+    private final LongSupplier _usedHeapSupplier;
+    private final LongSupplier _maxHeapSupplier;
+    private final LongSupplier _currentTimeMsSupplier;
+    private final Runnable _gcRunner;
+
+    private volatile ConfigSnapshot _config;
+    private volatile boolean _throttling;
+    private volatile long _lastCheckTimeMs = -1L;
+    private long _lastGcRequestTimeMs = -1L;
+    private volatile boolean _loggedInvalidHeapSize;
+    private volatile boolean _loggedInvalidThresholds;
+
+    ServerThrottleState(@Nullable PinotConfiguration 
instanceDataManagerConfig, ServerMetrics serverMetrics,
+        LongSupplier usedHeapSupplier, LongSupplier maxHeapSupplier, 
LongSupplier currentTimeMsSupplier,
+        Runnable gcRunner) {
+      _serverMetrics = serverMetrics;
+      _instanceDataManagerConfig =
+          instanceDataManagerConfig != null ? new 
PinotConfiguration(instanceDataManagerConfig.toMap())
+              : new PinotConfiguration();
+      _usedHeapSupplier = usedHeapSupplier;
+      _maxHeapSupplier = maxHeapSupplier;
+      _currentTimeMsSupplier = currentTimeMsSupplier;
+      _gcRunner = gcRunner;
+      _config = ConfigSnapshot.fromConfig(_instanceDataManagerConfig, 
Map.of());
+    }
+
+    @Override
+    public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+      if (containsIngestionOomProtectionConfig(changedConfigs)) {
+        updateConfigFromClusterConfigs(Map.copyOf(clusterConfigs));
+      }
+    }
+
+    boolean shouldThrottle() {
+      long nowMs = _currentTimeMsSupplier.getAsLong();
+      if (isWithinCheckInterval(nowMs)) {
+        return _throttling;
+      }
+      synchronized (this) {
+        nowMs = _currentTimeMsSupplier.getAsLong();
+        if (isWithinCheckInterval(nowMs)) {
+          return _throttling;
+        }
+        _lastCheckTimeMs = nowMs;
+        return sampleAndUpdate(nowMs);
+      }
+    }
+
+    boolean isThrottling() {
+      return _throttling;
+    }
+
+    long getCheckIntervalMs() {
+      return _config._checkIntervalMs;
+    }
+
+    ServerMode getMode() {
+      return _config._mode;
+    }
+
+    void resetMetrics() {
+      updateThrottling(false, 0.0);
+    }
+
+    private void updateConfigFromClusterConfigs(Map<String, String> 
clusterConfigs) {
+      try {
+        ConfigSnapshot oldConfig = _config;
+        ConfigSnapshot config = 
ConfigSnapshot.fromConfig(_instanceDataManagerConfig, clusterConfigs);
+        _config = config;
+        _lastCheckTimeMs = -1L;
+        _loggedInvalidThresholds = false;
+        if (_throttling && (oldConfig._mode != config._mode || 
!config.isValidThresholdConfig())) {
+          updateThrottling(false, 0.0, config);
+        }
+        LOGGER.info("Updated server ingestion OOM protection config: mode={}, 
heapUsageThrottleThreshold={}%, "
+                + "heapUsageRecoveryThreshold={}%, checkIntervalMs={}, 
gcIntervalMs={}", config._mode,
+            Math.round(config._heapUsageThrottleThreshold * 100), 
Math.round(config._heapUsageRecoveryThreshold * 100),
+            config._checkIntervalMs, config._gcIntervalMs);
+      } catch (RuntimeException e) {
+        LOGGER.warn("Ignoring invalid server ingestion OOM protection cluster 
config update", e);
+      }
+    }
+
+    private boolean sampleAndUpdate(long nowMs) {
+      ConfigSnapshot config = _config;
+      if (!config.isValidThresholdConfig()) {
+        if (!_loggedInvalidThresholds) {
+          LOGGER.warn("Disabling server ingestion OOM protection because 
thresholds are invalid. "
+                  + "heapUsageThrottleThreshold: {}, 
heapUsageRecoveryThreshold: {}",
+              config._heapUsageThrottleThreshold, 
config._heapUsageRecoveryThreshold);
+          _loggedInvalidThresholds = true;
+        }
+        updateThrottling(false, 0.0);
+        return false;
+      }
+
+      long maxHeapBytes = _maxHeapSupplier.getAsLong();
+      if (maxHeapBytes <= 0) {
+        if (!_loggedInvalidHeapSize) {
+          LOGGER.warn("Disabling server ingestion OOM protection because max 
heap size is not available: {}",
+              maxHeapBytes);
+          _loggedInvalidHeapSize = true;
+        }
+        updateThrottling(false, 0.0);
+        return false;
+      }
+
+      double heapUsageRatio = Math.min(1.0, Math.max(0.0, (double) 
_usedHeapSupplier.getAsLong() / maxHeapBytes));
+      boolean shouldThrottle = _throttling ? heapUsageRatio > 
config._heapUsageRecoveryThreshold
+          : heapUsageRatio >= config._heapUsageThrottleThreshold;
+      updateThrottling(shouldThrottle, heapUsageRatio, config);
+      if (shouldThrottle) {
+        requestGcIfNeeded(heapUsageRatio, nowMs, config._gcIntervalMs);
+      }
+      return shouldThrottle;
+    }
+
+    private void updateThrottling(boolean shouldThrottle, double 
heapUsageRatio) {
+      updateThrottling(shouldThrottle, heapUsageRatio, _config);
+    }
+
+    private void updateThrottling(boolean shouldThrottle, double 
heapUsageRatio, ConfigSnapshot config) {
+      boolean wasThrottling = _throttling;
+      _throttling = shouldThrottle;
+      
_serverMetrics.setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
+          shouldThrottle ? 1L : 0L);
+      if (shouldThrottle && !wasThrottling) {
+        LOGGER.warn("Server ingestion OOM protection activated. Heap usage: 
{}%, threshold: {}%, "
+                + "recovery threshold: {}%", Math.round(heapUsageRatio * 100),
+            Math.round(config._heapUsageThrottleThreshold * 100),
+            Math.round(config._heapUsageRecoveryThreshold * 100));
+      } else if (shouldThrottle) {
+        LOGGER.warn("Server ingestion OOM protection remains active. Heap 
usage: {}%, recovery threshold: {}%",
+            Math.round(heapUsageRatio * 100), 
Math.round(config._heapUsageRecoveryThreshold * 100));
+      } else if (!shouldThrottle && wasThrottling) {
+        _lastGcRequestTimeMs = -1L;
+        LOGGER.info("Server ingestion OOM protection released. Heap usage: 
{}%, recovery threshold: {}%",
+            Math.round(heapUsageRatio * 100), 
Math.round(config._heapUsageRecoveryThreshold * 100));
+      }
+    }
+
+    private void requestGcIfNeeded(double heapUsageRatio, long nowMs, long 
gcIntervalMs) {
+      if (gcIntervalMs <= 0) {
+        return;
+      }
+      if (_lastGcRequestTimeMs >= 0 && nowMs >= _lastGcRequestTimeMs
+          && nowMs - _lastGcRequestTimeMs < gcIntervalMs) {
+        return;
+      }
+      _lastGcRequestTimeMs = nowMs;
+      LOGGER.warn("Requesting JVM GC while server ingestion OOM protection is 
active. Heap usage: {}%",
+          Math.round(heapUsageRatio * 100));
+      _gcRunner.run();
+    }
+
+    private boolean isWithinCheckInterval(long nowMs) {
+      return _lastCheckTimeMs >= 0 && nowMs - _lastCheckTimeMs < 
_config._checkIntervalMs;
+    }
+  }
+
+  private static class ConfigSnapshot {
+    private final ServerMode _mode;
+    private final double _heapUsageThrottleThreshold;
+    private final double _heapUsageRecoveryThreshold;
+    private final long _checkIntervalMs;
+    private final long _gcIntervalMs;
+
+    private ConfigSnapshot(PinotConfiguration config) {
+      _mode = getMode(config.getProperty(
+          
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE,
+          
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_MODE));
+      _heapUsageThrottleThreshold = config.getProperty(
+          
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD,
+          
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD);
+      _heapUsageRecoveryThreshold = config.getProperty(
+          
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD,
+          
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD);
+      long checkIntervalMs = config.getProperty(
+          
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
+          
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+      _checkIntervalMs = checkIntervalMs > 0 ? checkIntervalMs
+          : 
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS;
+      _gcIntervalMs = config.getProperty(
+          
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS,
+          
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS);
+    }
+
+    private static ConfigSnapshot fromConfig(PinotConfiguration 
instanceDataManagerConfig,
+        Map<String, String> clusterConfigs) {
+      Map<String, Object> mergedConfig = new 
HashMap<>(instanceDataManagerConfig.toMap());
+      for (String configKey : SERVER_INGESTION_OOM_PROTECTION_CONFIG_KEYS) {
+        String clusterConfigValue = getClusterConfigValue(clusterConfigs, 
configKey);
+        if (clusterConfigValue != null) {
+          mergedConfig.put(configKey, clusterConfigValue);
+        }
+      }
+      return new ConfigSnapshot(new PinotConfiguration(mergedConfig));
+    }
+
+    private static String getClusterConfigValue(Map<String, String> 
clusterConfigs, String configKey) {
+      String value = clusterConfigs.get(configKey);
+      if (value != null) {
+        return value;
+      }
+      return clusterConfigs.get(SERVER_INSTANCE_CONFIG_PREFIX + configKey);
+    }
+
+    private boolean isValidThresholdConfig() {
+      return _heapUsageThrottleThreshold > 0.0 && _heapUsageThrottleThreshold 
< 1.0
+          && _heapUsageRecoveryThreshold > 0.0 && _heapUsageRecoveryThreshold 
< _heapUsageThrottleThreshold;
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 367d71112db..41fb745acd4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
+import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
@@ -70,7 +71,10 @@ import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
@@ -864,6 +868,109 @@ public class RealtimeSegmentDataManagerTest {
     }
   }
 
+  @Test
+  public void 
testServerIngestionOomProtectionWaitsAndResumesWhileInitialConsuming()
+      throws Exception {
+    TimeSupplier timeSupplier = new TimeSupplier();
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager(true, timeSupplier,
+        String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS), 
"10m", null)) {
+      segmentDataManager._stubConsumeLoop = false;
+      segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+      ServerIngestionOomProtectionManager oomProtectionManager = 
mock(ServerIngestionOomProtectionManager.class);
+      
when(oomProtectionManager.waitIfProtectionNeeded(any())).thenReturn(true, 
false);
+      
segmentDataManager.setServerIngestionOomProtectionManager(oomProtectionManager);
+
+      RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
+      final LongMsgOffset endOffset =
+          new LongMsgOffset(START_OFFSET_VALUE + 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      segmentDataManager._consumeOffsets.add(endOffset);
+      SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
+          new SegmentCompletionProtocol.Response.Params().withStatus(
+                  SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+              .withStreamPartitionMsgOffset(endOffset.toString()));
+      segmentDataManager._responses.add(response);
+
+      consumer.run();
+
+      verify(oomProtectionManager, atLeast(1)).waitIfProtectionNeeded(any());
+      Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+          FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    }
+  }
+
+  @Test
+  public void 
testServerIngestionOomProtectionStopPredicateDoesNotMutateEndCriteria()
+      throws Exception {
+    TimeSupplier timeSupplier = new TimeSupplier();
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager(true, timeSupplier,
+        String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS), 
"10m", null)) {
+      segmentDataManager._stubConsumeLoop = false;
+      segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+      ServerIngestionOomProtectionManager oomProtectionManager = 
mock(ServerIngestionOomProtectionManager.class);
+      
when(oomProtectionManager.waitIfProtectionNeeded(any())).thenAnswer(invocation 
-> {
+        BooleanSupplier stopCondition = invocation.getArgument(0);
+        long consumeEndTime = segmentDataManager.getConsumeEndTime();
+        timeSupplier.set(consumeEndTime);
+
+        Assert.assertTrue(stopCondition.getAsBoolean());
+        Assert.assertEquals(segmentDataManager.getConsumeEndTime(), 
consumeEndTime);
+        return true;
+      });
+      
segmentDataManager.setServerIngestionOomProtectionManager(oomProtectionManager);
+
+      RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
+      final LongMsgOffset endOffset =
+          new LongMsgOffset(START_OFFSET_VALUE + 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      segmentDataManager._consumeOffsets.add(endOffset);
+      SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
+          new SegmentCompletionProtocol.Response.Params().withStatus(
+                  SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+              .withStreamPartitionMsgOffset(endOffset.toString()));
+      segmentDataManager._responses.add(response);
+
+      consumer.run();
+
+      verify(oomProtectionManager, atLeast(1)).waitIfProtectionNeeded(any());
+      Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+          FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    }
+  }
+
+  @Test
+  public void testServerIngestionOomProtectionIsSkippedWhileCatchingUp()
+      throws Exception {
+    TimeSupplier timeSupplier = new TimeSupplier();
+    try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager(true, timeSupplier,
+        String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 
2), "10m", null)) {
+      segmentDataManager._stubConsumeLoop = false;
+      segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.CATCHING_UP);
+      LongMsgOffset finalOffset =
+          new LongMsgOffset(START_OFFSET_VALUE + 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      Field finalOffsetField = 
RealtimeSegmentDataManager.class.getDeclaredField("_finalOffset");
+      finalOffsetField.setAccessible(true);
+      finalOffsetField.set(segmentDataManager, finalOffset);
+
+      ServerIngestionOomProtectionManager oomProtectionManager = 
mock(ServerIngestionOomProtectionManager.class);
+      
when(oomProtectionManager.waitIfProtectionNeeded(any())).thenReturn(true);
+      
segmentDataManager.setServerIngestionOomProtectionManager(oomProtectionManager);
+
+      RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
+      SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
+          new SegmentCompletionProtocol.Response.Params().withStatus(
+                  SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+              .withStreamPartitionMsgOffset(finalOffset.toString()));
+      segmentDataManager._responses.add(response);
+
+      consumer.run();
+
+      verify(oomProtectionManager, never()).waitIfProtectionNeeded(any());
+      Assert.assertEquals(((LongMsgOffset) 
segmentDataManager.getCurrentOffset()).getOffset(),
+          START_OFFSET_VALUE + 
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    }
+  }
+
   @Test
   public void testCompletionModeDownloadWithUrlValidation()
       throws Exception {
@@ -1267,6 +1374,10 @@ public class RealtimeSegmentDataManagerTest {
       setLong(endTime, "_consumeEndTime");
     }
 
+    public long getConsumeEndTime() {
+      return getLong("_consumeEndTime");
+    }
+
     public void setNumRowsConsumed(int numRows) {
       setInt(numRows, "_numRowsConsumed");
     }
@@ -1321,6 +1432,19 @@ public class RealtimeSegmentDataManagerTest {
       }
     }
 
+    private long getLong(String fieldName) {
+      try {
+        Field field = 
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.getLong(this);
+      } catch (NoSuchFieldException e) {
+        Assert.fail();
+      } catch (IllegalAccessException e) {
+        Assert.fail();
+      }
+      throw new RuntimeException("Cannot get here");
+    }
+
     private void setOffset(long value, String fieldName) {
       try {
         Field field = 
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java
new file mode 100644
index 00000000000..b2b640d37be
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class ServerIngestionOomProtectionManagerTest {
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+  @Test
+  public void testDefaultDisableModeDoesNotProtectTables() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    TableConfig tableConfig = buildTableConfig(null);
+
+    ServerIngestionOomProtectionManager statelessRealtimeManager =
+        buildManager(Collections.emptyMap(), tableConfig, false, 
usedHeapBytes, maxHeapBytes, nowMs);
+    assertFalse(statelessRealtimeManager.shouldThrottle());
+
+    ServerIngestionOomProtectionManager upsertOrDedupManager =
+        buildManager(Collections.emptyMap(), tableConfig, true, usedHeapBytes, 
maxHeapBytes, nowMs);
+    assertFalse(upsertOrDedupManager.shouldThrottle());
+  }
+
+  @Test
+  public void testUpsertDedupOnlyModeAppliesOnlyToUpsertOrDedupTables() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    TableConfig tableConfig = buildTableConfig(null);
+
+    ServerIngestionOomProtectionManager statelessRealtimeManager =
+        buildManager(upsertDedupOnlyServerConfig(), tableConfig, false, 
usedHeapBytes, maxHeapBytes,
+            nowMs);
+    assertFalse(statelessRealtimeManager.shouldThrottle());
+
+    ServerIngestionOomProtectionManager upsertOrDedupManager =
+        buildManager(upsertDedupOnlyServerConfig(), tableConfig, true, 
usedHeapBytes, maxHeapBytes,
+            nowMs);
+    assertTrue(upsertOrDedupManager.shouldThrottle());
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(91);
+    assertTrue(upsertOrDedupManager.shouldThrottle());
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(90);
+    assertFalse(upsertOrDedupManager.shouldThrottle());
+  }
+
+  @Test
+  public void testTableDisableOverridesServerPolicy() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+
+    ServerIngestionOomProtectionManager manager =
+        buildManager(upsertDedupOnlyServerConfig(), 
buildTableConfig(Enablement.DISABLE), true, usedHeapBytes,
+            maxHeapBytes, nowMs);
+
+    assertFalse(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testDisabledTableDoesNotUpdateGauges() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+
+    ServerIngestionOomProtectionManager manager =
+        buildManager(upsertDedupOnlyServerConfig(), 
buildTableConfig(Enablement.DISABLE), true, usedHeapBytes,
+            maxHeapBytes, nowMs, serverMetrics);
+
+    assertFalse(manager.shouldThrottle());
+    verifyNoInteractions(serverMetrics);
+  }
+
+  @Test
+  public void 
testSharedServerThrottleStateSamplesHeapOnceAcrossManagersWithinInterval() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    AtomicInteger usedHeapReads = new AtomicInteger();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(upsertDedupOnlyServerConfig(), () -> {
+          usedHeapReads.incrementAndGet();
+          return usedHeapBytes.get();
+        }, maxHeapBytes::get, nowMs, serverMetrics);
+    ServerIngestionOomProtectionManager manager1 =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+    ServerIngestionOomProtectionManager manager2 =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+
+    assertTrue(manager1.shouldThrottle());
+    assertTrue(manager2.shouldThrottle());
+    assertEquals(usedHeapReads.get(), 1);
+  }
+
+  @Test
+  public void testTableEnableUsesServerThresholds() {
+    AtomicLong usedHeapBytes = new AtomicLong(90);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+
+    ServerIngestionOomProtectionManager manager =
+        buildManager(Collections.emptyMap(), 
buildTableConfig(Enablement.ENABLE), false, usedHeapBytes,
+            maxHeapBytes, nowMs);
+
+    assertFalse(manager.shouldThrottle());
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(96);
+    assertTrue(manager.shouldThrottle());
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(91);
+    assertTrue(manager.shouldThrottle());
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(90);
+    assertFalse(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testGcRequestedWhenThrottlingAndRateLimited() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    AtomicInteger gcRequests = new AtomicInteger();
+    ServerIngestionOomProtectionManager manager =
+        buildManager(upsertDedupOnlyServerConfig(), buildTableConfig(null), 
true, usedHeapBytes,
+            maxHeapBytes, nowMs, mock(ServerMetrics.class), () -> 
gcRequests.incrementAndGet());
+
+    assertTrue(manager.shouldThrottle());
+    assertEquals(gcRequests.get(), 1);
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    assertTrue(manager.shouldThrottle());
+    assertEquals(gcRequests.get(), 1);
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS);
+    assertTrue(manager.shouldThrottle());
+    assertEquals(gcRequests.get(), 2);
+  }
+
+  @Test
+  public void testGcRequestCanBeDisabled() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    AtomicInteger gcRequests = new AtomicInteger();
+    Map<String, Object> serverConfig =
+        
upsertDedupOnlyServerConfig(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS,
+            0L);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(serverConfig, buildTableConfig(null), true, 
usedHeapBytes, maxHeapBytes, nowMs,
+            mock(ServerMetrics.class), () -> gcRequests.incrementAndGet());
+
+    assertTrue(manager.shouldThrottle());
+    assertEquals(gcRequests.get(), 0);
+  }
+
+  @Test
+  public void testGcRequestTimerResetsWhenThrottlingReleases() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    AtomicInteger gcRequests = new AtomicInteger();
+    ServerIngestionOomProtectionManager manager =
+        buildManager(upsertDedupOnlyServerConfig(), buildTableConfig(null), 
true, usedHeapBytes,
+            maxHeapBytes, nowMs, mock(ServerMetrics.class), () -> 
gcRequests.incrementAndGet());
+
+    assertTrue(manager.shouldThrottle());
+    assertEquals(gcRequests.get(), 1);
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(90);
+    assertFalse(manager.shouldThrottle());
+
+    
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    usedHeapBytes.set(96);
+    assertTrue(manager.shouldThrottle());
+    assertEquals(gcRequests.get(), 2);
+  }
+
+  @Test
+  public void testGcRequestIsRateLimitedAcrossManagers() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    AtomicInteger gcRequests = new AtomicInteger();
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(upsertDedupOnlyServerConfig(), 
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+            mock(ServerMetrics.class), () -> gcRequests.incrementAndGet());
+    ServerIngestionOomProtectionManager manager1 =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+    ServerIngestionOomProtectionManager manager2 =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+
+    assertTrue(manager1.shouldThrottle());
+    assertTrue(manager2.shouldThrottle());
+    assertEquals(gcRequests.get(), 1);
+  }
+
+  @Test
+  public void testServerEnableModeProtectsAllRealtimeTables() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    Map<String, Object> serverConfig =
+        serverModeConfig("ENABLE");
+
+    ServerIngestionOomProtectionManager manager =
+        buildManager(serverConfig, buildTableConfig(null), false, 
usedHeapBytes, maxHeapBytes,
+            nowMs);
+
+    assertTrue(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testClusterConfigCanDynamicallyUpdateServerMode() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(Collections.emptyMap(), usedHeapBytes::get, 
maxHeapBytes::get, nowMs, serverMetrics);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+
+    assertFalse(manager.shouldThrottle());
+
+    String modeConfigKey =
+        
fullServerInstanceConfigKey(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE);
+    serverThrottleState.onChange(Set.of(modeConfigKey), Map.of(modeConfigKey, 
"UPSERT_DEDUP_ONLY"));
+
+    assertTrue(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testClusterConfigCanDynamicallyUpdateServerThresholds() {
+    AtomicLong usedHeapBytes = new AtomicLong(92);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(upsertDedupOnlyServerConfig(), 
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+            serverMetrics);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+
+    assertFalse(manager.shouldThrottle());
+
+    String throttleThresholdConfigKey = fullServerInstanceConfigKey(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD);
+    String recoveryThresholdConfigKey = fullServerInstanceConfigKey(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD);
+    serverThrottleState.onChange(Set.of(throttleThresholdConfigKey, 
recoveryThresholdConfigKey),
+        Map.of(throttleThresholdConfigKey, "0.90", recoveryThresholdConfigKey, 
"0.85"));
+
+    assertTrue(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testInvalidClusterConfigUpdateKeepsPreviousConfig() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(upsertDedupOnlyServerConfig(), 
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+            mock(ServerMetrics.class));
+    ServerIngestionOomProtectionManager manager =
+        buildManager(buildTableConfig(null), true, serverThrottleState);
+
+    String thresholdConfigKey = fullServerInstanceConfigKey(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD);
+    serverThrottleState.onChange(Set.of(thresholdConfigKey), 
Map.of(thresholdConfigKey, "not-a-double"));
+
+    assertTrue(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testClusterConfigDeletionFallsBackToInstanceConfig() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(serverModeConfig("ENABLE"), 
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+            serverMetrics);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(buildTableConfig(null), false, serverThrottleState);
+
+    String modeConfigKey =
+        
fullServerInstanceConfigKey(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE);
+    serverThrottleState.onChange(Set.of(modeConfigKey), Map.of(modeConfigKey, 
"DISABLE"));
+    assertFalse(manager.shouldThrottle());
+
+    serverThrottleState.onChange(Set.of(modeConfigKey), 
Collections.emptyMap());
+    assertTrue(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testClusterConfigModeChangeClearsActiveThrottle() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    ServerIngestionOomProtectionManager.ServerThrottleState 
serverThrottleState =
+        buildServerThrottleState(serverModeConfig("ENABLE"), 
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+            serverMetrics);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(buildTableConfig(null), false, serverThrottleState);
+
+    assertTrue(manager.shouldThrottle());
+
+    String modeConfigKey =
+        
fullServerInstanceConfigKey(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE);
+    serverThrottleState.onChange(Set.of(modeConfigKey), Map.of(modeConfigKey, 
"DISABLE"));
+
+    assertFalse(manager.isThrottling());
+    
verify(serverMetrics).setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
 0L);
+  }
+
+  @Test
+  public void testTableEnableOverridesServerDisableMode() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+
+    ServerIngestionOomProtectionManager manager =
+        buildManager(Collections.emptyMap(), 
buildTableConfig(Enablement.ENABLE), true, usedHeapBytes,
+            maxHeapBytes, nowMs);
+
+    assertTrue(manager.shouldThrottle());
+  }
+
+  @Test
+  public void testResetMetricsClearsProtectionGauges() {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(upsertDedupOnlyServerConfig(), buildTableConfig(null), 
true, usedHeapBytes,
+            maxHeapBytes, nowMs, serverMetrics);
+
+    assertTrue(manager.shouldThrottle());
+    manager.resetMetrics();
+
+    
verify(serverMetrics).setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
 0L);
+  }
+
+  @Test
+  public void testWaitIfProtectionNeededStopsWhenStopConditionIsMet()
+      throws Exception {
+    AtomicLong usedHeapBytes = new AtomicLong(96);
+    AtomicLong maxHeapBytes = new AtomicLong(100);
+    AtomicLong nowMs = new AtomicLong();
+    AtomicInteger stopChecks = new AtomicInteger();
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    Map<String, Object> serverConfig =
+        upsertDedupOnlyServerConfig(
+            
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
 1L);
+    ServerIngestionOomProtectionManager manager =
+        buildManager(serverConfig, buildTableConfig(null), true, 
usedHeapBytes, maxHeapBytes, nowMs,
+            serverMetrics);
+
+    assertTrue(manager.waitIfProtectionNeeded(() -> 
stopChecks.getAndIncrement() > 0));
+  }
+
+  private static ServerIngestionOomProtectionManager buildManager(Map<String, 
Object> serverConfig,
+      TableConfig tableConfig, boolean upsertOrDedupEnabled, AtomicLong 
usedHeapBytes, AtomicLong maxHeapBytes,
+      AtomicLong nowMs) {
+    return buildManager(serverConfig, tableConfig, upsertOrDedupEnabled, 
usedHeapBytes, maxHeapBytes, nowMs,
+        mock(ServerMetrics.class));
+  }
+
+  private static ServerIngestionOomProtectionManager buildManager(Map<String, 
Object> serverConfig,
+      TableConfig tableConfig, boolean upsertOrDedupEnabled, AtomicLong 
usedHeapBytes, AtomicLong maxHeapBytes,
+      AtomicLong nowMs, ServerMetrics serverMetrics) {
+    return buildManager(serverConfig, tableConfig, upsertOrDedupEnabled, 
usedHeapBytes, maxHeapBytes, nowMs,
+        serverMetrics, ServerIngestionOomProtectionManagerTest::noOpGc);
+  }
+
+  private static ServerIngestionOomProtectionManager buildManager(Map<String, 
Object> serverConfig,
+      TableConfig tableConfig, boolean upsertOrDedupEnabled, AtomicLong 
usedHeapBytes, AtomicLong maxHeapBytes,
+      AtomicLong nowMs, ServerMetrics serverMetrics, Runnable gcRunner) {
+    return new ServerIngestionOomProtectionManager(() -> tableConfig, () -> 
upsertOrDedupEnabled,
+        new ServerIngestionOomProtectionManager.ServerThrottleState(new 
PinotConfiguration(serverConfig), serverMetrics,
+            usedHeapBytes::get, maxHeapBytes::get, nowMs::get, gcRunner));
+  }
+
+  private static ServerIngestionOomProtectionManager buildManager(TableConfig 
tableConfig,
+      boolean upsertOrDedupEnabled, 
ServerIngestionOomProtectionManager.ServerThrottleState serverThrottleState) {
+    return new ServerIngestionOomProtectionManager(() -> tableConfig, () -> 
upsertOrDedupEnabled, serverThrottleState);
+  }
+
+  private static ServerIngestionOomProtectionManager.ServerThrottleState 
buildServerThrottleState(
+      Map<String, Object> serverConfig, LongSupplier usedHeapSupplier, 
LongSupplier maxHeapSupplier,
+      AtomicLong nowMs, ServerMetrics serverMetrics) {
+    return buildServerThrottleState(serverConfig, usedHeapSupplier, 
maxHeapSupplier, nowMs, serverMetrics,
+        ServerIngestionOomProtectionManagerTest::noOpGc);
+  }
+
+  private static ServerIngestionOomProtectionManager.ServerThrottleState 
buildServerThrottleState(
+      Map<String, Object> serverConfig, LongSupplier usedHeapSupplier, 
LongSupplier maxHeapSupplier,
+      AtomicLong nowMs, ServerMetrics serverMetrics, Runnable gcRunner) {
+    return new ServerIngestionOomProtectionManager.ServerThrottleState(new 
PinotConfiguration(serverConfig),
+        serverMetrics, usedHeapSupplier, maxHeapSupplier, nowMs::get, 
gcRunner);
+  }
+
+  private static Map<String, Object> upsertDedupOnlyServerConfig() {
+    return serverModeConfig("UPSERT_DEDUP_ONLY");
+  }
+
+  private static Map<String, Object> upsertDedupOnlyServerConfig(String key, 
Object value) {
+    return 
Map.of(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE, 
"UPSERT_DEDUP_ONLY", key,
+        value);
+  }
+
+  private static Map<String, Object> serverModeConfig(String mode) {
+    return 
Map.of(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE, 
mode);
+  }
+
+  private static String fullServerInstanceConfigKey(String key) {
+    return CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + 
key;
+  }
+
+  private static TableConfig buildTableConfig(@Nullable Enablement 
oomProtection) {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
+    if (oomProtection != null) {
+      StreamIngestionConfig streamIngestionConfig =
+          new StreamIngestionConfig(List.of(Map.of("streamType", "kafka")));
+      streamIngestionConfig.setOomProtection(oomProtection);
+      IngestionConfig ingestionConfig = new IngestionConfig();
+      ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+      tableConfig.setIngestionConfig(ingestionConfig);
+    }
+    return tableConfig;
+  }
+
+  private static void noOpGc() {
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index 1eb5d402f5f..68d13ad05ff 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
@@ -51,6 +52,15 @@ public class FailureInjectingRealtimeTableDataManager 
extends RealtimeTableDataM
     _failureInjectingTableConfig = failureInjectingTableConfig;
   }
 
+  public FailureInjectingRealtimeTableDataManager(Semaphore 
segmentBuildSemaphore,
+      BooleanSupplier isServerReadyToConsumeData, BooleanSupplier 
isServerReadyToServeQueries,
+      ServerIngestionOomProtectionManager.ServerThrottleState 
serverIngestionOomProtectionThrottleState,
+      @Nullable FailureInjectingTableConfig failureInjectingTableConfig) {
+    super(segmentBuildSemaphore, isServerReadyToConsumeData, 
isServerReadyToServeQueries,
+        serverIngestionOomProtectionThrottleState);
+    _failureInjectingTableConfig = failureInjectingTableConfig;
+  }
+
   @Override
   protected RealtimeSegmentDataManager 
createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
       TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema 
schema, LLCSegmentName llcSegmentName,
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index d73bcb38260..98ac676c922 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
+import 
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -84,6 +85,7 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       BooleanSupplier isServerReadyToConsumeData,
       BooleanSupplier isServerReadyToServeQueries,
+      ServerIngestionOomProtectionManager.ServerThrottleState 
serverIngestionOomProtectionThrottleState,
       boolean enableAsyncSegmentRefresh,
       ServerReloadJobStatusCache reloadJobStatusCache) {
     TableDataManager tableDataManager;
@@ -116,8 +118,8 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
           }
         }
         tableDataManager =
-            new 
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore, 
isServerReadyToServeQueries,
-                failureInjectingTableConfig);
+            new 
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore, 
isServerReadyToConsumeData,
+                isServerReadyToServeQueries, 
serverIngestionOomProtectionThrottleState, failureInjectingTableConfig);
         break;
       default:
         throw new IllegalStateException();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 6acc2a2b422..ff15a2fd0f4 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -486,11 +486,12 @@ public final class TableConfigUtils {
         IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
         Preconditions.checkState(indexingConfig == null || 
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
             "Should not use indexingConfig#getStreamConfigs if 
ingestionConfig#StreamIngestionConfig is provided");
-        List<Map<String, String>> streamConfigMaps = 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
+        StreamIngestionConfig streamIngestionConfig = 
ingestionConfig.getStreamIngestionConfig();
+        List<Map<String, String>> streamConfigMaps = 
streamIngestionConfig.getStreamConfigMaps();
         Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at 
least 1 stream in REALTIME table");
         // TODO: for multiple stream configs, validate them
 
-        boolean isPauselessEnabled = 
ingestionConfig.getStreamIngestionConfig().isPauselessConsumptionEnabled();
+        boolean isPauselessEnabled = 
streamIngestionConfig.isPauselessConsumptionEnabled();
         if (isPauselessEnabled) {
           int replication = tableConfig.getReplication();
           // We are checking for this only when replication is greater than 1 
because in test environments
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index c7cab544ed3..888c2de86d5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -963,6 +963,31 @@ public class TableConfigUtilsTest {
     }
   }
 
+  @Test
+  public void ingestionServerIngestionOomProtectionTest() {
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    StreamIngestionConfig streamIngestionConfig =
+        new 
StreamIngestionConfig(Collections.singletonList(getStreamConfigs()));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setIngestionConfig(ingestionConfig)
+        .build();
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+    streamIngestionConfig.setOomProtection(Enablement.ENABLE);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+    streamIngestionConfig.setOomProtection(Enablement.DISABLE);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+    streamIngestionConfig.setOomProtection(Enablement.DEFAULT);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+    streamIngestionConfig.setOomProtection(null);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+  }
+
   @Test
   public void ingestionBatchConfigsTest() {
     Schema schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index b22f220a4df..cfbbc349c88 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -795,6 +795,10 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     PinotClusterConfigChangeListener serverRateLimitConfigChangeListener =
         new ServerRateLimitConfigChangeListener(_serverMetrics);
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener);
+    if (instanceDataManager instanceof HelixInstanceDataManager) {
+      _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+          ((HelixInstanceDataManager) 
instanceDataManager).getServerIngestionOomProtectionThrottleState());
+    }
 
     // Register query killing manager for dynamic config updates 
(threshold/mode changes via ZK)
     if (_queryKillingManager != null) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index c55adf58c4c..04cc89a7723 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -57,6 +57,7 @@ import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
+import 
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
@@ -105,6 +106,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private SegmentUploader _segmentUploader;
   private BooleanSupplier _isServerReadyToConsumeData = () -> false;
   private BooleanSupplier _isServerReadyToServeQueries = () -> false;
+  private ServerIngestionOomProtectionManager.ServerThrottleState 
_serverIngestionOomProtectionThrottleState;
 
   // Fixed size LRU cache for storing last N errors on the instance.
   // Key is TableNameWithType-SegmentName pair.
@@ -143,6 +145,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     _instanceId = _instanceDataManagerConfig.getInstanceId();
     _helixManager = helixManager;
     _reloadJobStatusCache = requireNonNull(reloadJobStatusCache, 
"reloadJobStatusCache cannot be null");
+    _serverIngestionOomProtectionThrottleState =
+        ServerIngestionOomProtectionManager.createServerThrottleState(config, 
serverMetrics);
     String tableDataManagerProviderClass = 
_instanceDataManagerConfig.getTableDataManagerProviderClass();
     LOGGER.info("Initializing table data manager provider of class: {}", 
tableDataManagerProviderClass);
     _tableDataManagerProvider = 
PluginManager.get().createInstance(tableDataManagerProviderClass);
@@ -186,6 +190,10 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
         
.expireAfterWrite(_instanceDataManagerConfig.getDeletedTablesCacheTtlMinutes(), 
TimeUnit.MINUTES).build();
   }
 
+  ServerIngestionOomProtectionManager.ServerThrottleState 
getServerIngestionOomProtectionThrottleState() {
+    return _serverIngestionOomProtectionThrottleState;
+  }
+
   @VisibleForTesting
   void initInstanceDataDir(File instanceDataDir) {
     if (!instanceDataDir.exists()) {
@@ -361,7 +369,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     TableDataManager tableDataManager =
         _tableDataManagerProvider.getTableDataManager(tableConfig, schema, 
_segmentReloadSemaphore,
             _segmentReloadRefreshExecutor, _segmentPreloadExecutor, 
_errorCache, _isServerReadyToConsumeData,
-            _isServerReadyToServeQueries, _enableAsyncSegmentRefresh, 
_reloadJobStatusCache);
+            _isServerReadyToServeQueries, 
_serverIngestionOomProtectionThrottleState, _enableAsyncSegmentRefresh,
+            _reloadJobStatusCache);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index 2cc00498b0c..d6507061060 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
+import org.apache.pinot.spi.utils.Enablement;
 
 
 /**
@@ -66,6 +67,11 @@ public class StreamIngestionConfig extends BaseJsonConfig {
       + " partition during realtime ingestion. Defaults to false.")
   private boolean _dropRecordOnPartitionMismatch;
 
+  @JsonPropertyDescription("Optional table-level enablement override for 
server-side ingestion OOM protection. "
+      + "Supported values are ENABLE, DISABLE and DEFAULT. If unset or 
DEFAULT, the table follows the server-level "
+      + "mode.")
+  private Enablement _oomProtection = Enablement.DEFAULT;
+
   @JsonCreator
   public StreamIngestionConfig(@JsonProperty("streamConfigMaps") 
List<Map<String, String>> streamConfigMaps) {
     _streamConfigMaps = streamConfigMaps;
@@ -148,4 +154,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   public void setDropRecordOnPartitionMismatch(boolean 
dropRecordOnPartitionMismatch) {
     _dropRecordOnPartitionMismatch = dropRecordOnPartitionMismatch;
   }
+
+  public Enablement getOomProtection() {
+    return _oomProtection == null ? Enablement.DEFAULT : _oomProtection;
+  }
+
+  public void setOomProtection(@Nullable Enablement oomProtection) {
+    _oomProtection = oomProtection == null ? Enablement.DEFAULT : 
oomProtection;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3b2d69a33b5..33f94493840 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1579,6 +1579,24 @@ public class CommonConstants {
     // Default to 0.0 (no limit)
     public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
 
+    // Configs for server-side realtime ingestion OOM protection. These are 
consumed from the instance data manager
+    // config subset, so the user-facing property prefix is 
pinot.server.instance.
+    public static final String CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE =
+        "ingestion.oom.protection.mode";
+    public static final String DEFAULT_SERVER_INGESTION_OOM_PROTECTION_MODE = 
"DISABLE";
+    public static final String 
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD =
+        "ingestion.oom.protection.heapUsageThrottleThreshold";
+    public static final double 
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD = 0.95;
+    public static final String 
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD =
+        "ingestion.oom.protection.heapUsageRecoveryThreshold";
+    public static final double 
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD = 0.90;
+    public static final String 
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS =
+        "ingestion.oom.protection.checkIntervalMs";
+    public static final long 
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS = 1_000L;
+    public static final String 
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS =
+        "ingestion.oom.protection.gcIntervalMs";
+    public static final long 
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS = 30_000L;
+
     public static final String CONFIG_OF_MMAP_DEFAULT_ADVICE = 
"pinot.server.mmap.advice.default";
     public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY = 
"pinot.server.segment.fetcher";
 
diff --git 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md
new file mode 100644
index 00000000000..106c1dfab54
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md
@@ -0,0 +1,66 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# Server ingestion OOM protection
+
+Server ingestion OOM protection applies local backpressure to realtime 
ingestion when the server JVM heap is above the
+configured threshold. It is disabled by default. Set the server mode to 
`UPSERT_DEDUP_ONLY` to protect upsert and dedup
+realtime tables, or `ENABLE` to protect all realtime tables. When active, the 
consuming loop waits and checks heap usage
+again at the configured interval, then resumes ingestion after heap usage 
reaches the recovery threshold. While
+throttled, Pinot also requests JVM garbage collection at a rate-limited 
interval so a mostly ingestion-only server does
+not stay paused forever with reclaimable garbage still counted as used heap.
+
+Server-level properties are set on the server instance config:
+
+```properties
+pinot.server.instance.ingestion.oom.protection.mode=UPSERT_DEDUP_ONLY
+pinot.server.instance.ingestion.oom.protection.heapUsageThrottleThreshold=0.95
+pinot.server.instance.ingestion.oom.protection.heapUsageRecoveryThreshold=0.90
+pinot.server.instance.ingestion.oom.protection.checkIntervalMs=1000
+pinot.server.instance.ingestion.oom.protection.gcIntervalMs=30000
+```
+
+The same server properties can be set in the Pinot cluster config and updated 
dynamically at runtime. Runtime cluster
+config changes use the same full `pinot.server.instance.*` property names:
+
+```json
+{
+  "pinot.server.instance.ingestion.oom.protection.mode": "UPSERT_DEDUP_ONLY",
+  "pinot.server.instance.ingestion.oom.protection.heapUsageThrottleThreshold": 
"0.95",
+  "pinot.server.instance.ingestion.oom.protection.heapUsageRecoveryThreshold": 
"0.90",
+  "pinot.server.instance.ingestion.oom.protection.checkIntervalMs": "1000",
+  "pinot.server.instance.ingestion.oom.protection.gcIntervalMs": "30000"
+}
+```
+
+Use `pinot.server.instance.ingestion.oom.protection.mode=ENABLE` to apply the 
server-level policy to all realtime tables.
+Use `pinot.server.instance.ingestion.oom.protection.mode=DISABLE` to disable 
the server-level policy. Set
+`pinot.server.instance.ingestion.oom.protection.gcIntervalMs=0` to disable the 
explicit GC request while throttled.
+
+Each realtime table can override the server policy under 
`ingestionConfig.streamIngestionConfig`:
+
+```json
+"oomProtection": "ENABLE"
+```
+
+If table `oomProtection` is unset or `DEFAULT`, the table follows the server 
mode. Set it to `ENABLE` to
+protect this table even when the server mode is `DISABLE` or would otherwise 
skip it, or `DISABLE` to turn protection
+off for this table. Thresholds are configured at the server level only.
diff --git 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index 1d984b7129f..d5b9c62aaec 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -50,7 +50,8 @@
           "stream.kafka.zk.broker.url": "localhost:2191/kafka",
           "stream.kafka.broker.list": "localhost:19092"
         }
-      ]
+      ],
+      "oomProtection": "ENABLE"
     }
   },
   "upsertConfig": {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to