This is an automated email from the ASF dual-hosted git repository.

zhenyu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7ce00b2233 feat:Add SpRateLimiter and SpMemoryManager (#3796)
7ce00b2233 is described below

commit 7ce00b2233a93b80ba70ee7a2d8207d08a14474c
Author: xinhao liu <[email protected]>
AuthorDate: Fri Oct 31 17:30:06 2025 +0800

    feat:Add SpRateLimiter and SpMemoryManager (#3796)
---
 pom.xml                                            |   7 +
 .../apache/streampipes/commons/constants/Envs.java |  26 +-
 .../commons/environment/DefaultEnvironment.java    |  89 +++++-
 .../commons/environment/Environment.java           |  27 +-
 .../variable/LongEnvironmentVariable.java          |  13 +-
 .../commons/exceptions/SpHttpErrorStatusCode.java  |   2 +-
 .../prometheus/service/ElementServiceStats.java    |   2 -
 .../spmemorymanager/SpMemoryManagerMetrics.java    |  44 +++
 .../spmemorymanager/SpMemoryManagerStats.java      |  66 ++++
 .../spratelimiter/SpRateLimiterMetrics.java        |  44 +++
 .../spratelimiter/SpRateLimiterStats.java          |  66 ++++
 streampipes-extensions-api/pom.xml                 |  15 +
 .../extensions/api/limiter/SpRateLimiter.java      | 307 +++++++++++++++++++
 .../api/memorymanager/SpMemoryManager.java         | 331 +++++++++++++++++++++
 .../connect/HttpServerAdapterManagement.java       |   2 +-
 .../monitoring/ServiceLoadDataReportGenerator.java |   1 +
 .../rocketmq/adapter/RocketMQConsumer.java         |  10 +-
 .../rocketmq/sink/RocketMQPublisherSink.java       |   2 +-
 .../messaging/jms/ActiveMQConsumer.java            |  10 +-
 .../messaging/kafka/SpKafkaConsumer.java           |   9 +-
 .../messaging/pulsar/PulsarConsumer.java           |  10 +-
 .../messaging/InternalEventProcessor.java          |   2 +-
 .../StreamPipesExtensionsServiceBase.java          |   2 +
 .../standalone/function/StreamPipesFunction.java   |   2 -
 .../routing/StandaloneSpInputCollector.java        |   7 +-
 .../runtime/StandaloneEventProcessorRuntime.java   |   9 +
 .../runtime/StandaloneEventSinkRuntime.java        |   9 +
 27 files changed, 1092 insertions(+), 22 deletions(-)

diff --git a/pom.xml b/pom.xml
index c968f7e29e..367abf1482 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
         <eclipse.milo.version>0.6.16</eclipse.milo.version>
         <error-prone.version>2.10.0</error-prone.version>
         <file-management.version>3.1.0</file-management.version>
+        <findbugs.version>3.0.2</findbugs.version>
         <flink.version>1.13.5</flink.version>
         <fogsy-qudt.version>1.0</fogsy-qudt.version>
         <geojson-jackson.version>1.14</geojson-jackson.version>
@@ -240,6 +241,12 @@
                 <artifactId>gson</artifactId>
                 <version>${gson.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.google.code.findbugs</groupId>
+                <artifactId>jsr305</artifactId>
+                <version>${findbugs.version}</version>
+                <scope>compile</scope>
+            </dependency>
             <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 42c5522d72..69b76952bd 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -170,10 +170,30 @@ public enum Envs {
   SP_LOGGING_FILE_PREFIX("SP_LOGGING_FILE_PREFIX", "streampipes"),
   SP_LOGGING_FILE_DIR("SP_LOGGING_FILE_DIR", "logs"),
   SP_LOGGING_FILE_PATTERN(
-      "SP_LOGGING_FILE_PATTERN", "%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} 
- %msg%n"
-  );
-
+      "SP_LOGGING_FILE_PATTERN",
+      "%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"
+  ),
 
+  //SpRateLimiter
+  
SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD("SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD", 
"1000"),
+  
SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS("SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS",
 "0"),
+  
SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS("SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS",
 "15"),
+  
SP_RATE_LIMITER_STATS_RESET_THRESHOLD("SP_RATE_LIMITER_STATS_RESET_THRESHOLD", 
"1000"),
+  SP_RATE_LIMITER_STATS_RESET_FACTOR("SP_RATE_LIMITER_STATS_RESET_FACTOR", 
"999"),
+  SP_RATE_LIMITER_STATS_RESET_DIVISOR("SP_RATE_LIMITER_STATS_RESET_DIVISOR", 
"1000"),
+  
SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS("SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS",
 "5"),
+  SP_RATE_LIMITER_TIMEOUT_MS("SP_RATE_LIMITER_TIMEOUT_MS", "1000"),
+  
SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE("SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE",
 "0.7"),
+
+  //SpMemoryManager
+  
SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY("SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY",
 "1073741824"),
+  SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS("SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS", 
"1000"),
+  
SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS("SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS",
 "0"),
+  SP_MEMORY_SCHEDULER_PERIOD_SECONDS("SP_MEMORY_SCHEDULER_PERIOD_SECONDS", 
"15"),
+  SP_MEMORY_BYTES_TO_MB("SP_MEMORY_BYTES_TO_MB", "1048576"),
+  
SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS("SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS",
 "5"),
+  SP_MEMORY_MANAGER_USAGE_THRESHOLD("SP_MEMORY_MANAGER_USAGE_THRESHOLD", 
"0.9"),
+  SP_MEMORY_WARNING_THRESHOLD("SP_MEMORY_WARNING_THRESHOLD", "0.8");
 
   private final String envVariableName;
   private String defaultValue;
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index e33ffd4199..b04aec1da8 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -22,9 +22,10 @@ import 
org.apache.streampipes.commons.environment.model.OAuthConfiguration;
 import 
org.apache.streampipes.commons.environment.parser.OAuthConfigurationParser;
 import 
org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
 import 
org.apache.streampipes.commons.environment.variable.DoubleEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
 import 
org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.LongEnvironmentVariable;
 import 
org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
-import 
org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
 
 import java.util.List;
 
@@ -456,7 +457,6 @@ public class DefaultEnvironment implements Environment {
     return new StringEnvironmentVariable(Envs.SP_RETENTION_LOCAL_DIR);
   }
 
-
   @Override
   public DoubleEnvironmentVariable getDirMemoryResourceWeight() {
     return new DoubleEnvironmentVariable(Envs.DIR_MEMORY_RESOURCE_WEIGHT);
@@ -517,6 +517,91 @@ public class DefaultEnvironment implements Environment {
     return new BooleanEnvironmentVariable(Envs.LOAD_MANAGER_ENABLE);
   }
 
+  @Override
+  public LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod() {
+    return new 
LongEnvironmentVariable(Envs.SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD);
+  }
+
+  @Override
+  public IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds() {
+    return new 
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS);
+  }
+
+  @Override
+  public IntEnvironmentVariable getRateLimiterSchedulerPeriodSeconds() {
+        return new 
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS);
+  }
+
+  @Override
+  public IntEnvironmentVariable getRateLimiterStatsResetThreshold() {
+    return new 
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_THRESHOLD);
+  }
+
+  @Override
+  public IntEnvironmentVariable getRateLimiterStatsResetFactor() {
+    return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_FACTOR);
+  }
+
+  @Override
+  public IntEnvironmentVariable getRateLimiterStatsResetDivisor() {
+    return new 
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_DIVISOR);
+  }
+
+  @Override
+  public IntEnvironmentVariable getRateLimiterShutdownTimeoutSeconds() {
+    return new 
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS);
+  }
+
+  @Override
+  public LongEnvironmentVariable getRateLimiterTimeoutMs() {
+    return new LongEnvironmentVariable(Envs.SP_RATE_LIMITER_TIMEOUT_MS);
+  }
+
+  @Override
+  public DoubleEnvironmentVariable getRateLimiterPermitsSetPercentage() {
+      return new 
DoubleEnvironmentVariable(Envs.SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE);
+  }
+
+  @Override
+  public LongEnvironmentVariable getMemoryManagerDefaultInitialMemory() {
+    return new 
LongEnvironmentVariable(Envs.SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY);
+  }
+
+  @Override
+  public LongEnvironmentVariable getMemoryManagerWaitTimeoutMs() {
+    return new LongEnvironmentVariable(Envs.SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS);
+  }
+
+  @Override
+  public IntEnvironmentVariable getMemorySchedulerInitialDelaySeconds() {
+    return new 
IntEnvironmentVariable(Envs.SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS);
+  }
+
+  @Override
+  public IntEnvironmentVariable getMemorySchedulerPeriodSeconds() {
+    return new IntEnvironmentVariable(Envs.SP_MEMORY_SCHEDULER_PERIOD_SECONDS);
+  }
+
+  @Override
+  public LongEnvironmentVariable getMemoryBytesToMb() {
+    return new LongEnvironmentVariable(Envs.SP_MEMORY_BYTES_TO_MB);
+  }
+
+  @Override
+  public IntEnvironmentVariable getMemoryManagerShutdownTimeoutSeconds() {
+    return new 
IntEnvironmentVariable(Envs.SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS);
+  }
+
+  @Override
+  public DoubleEnvironmentVariable getMemoryManagerUsageThreshold() {
+    return new 
DoubleEnvironmentVariable(Envs.SP_MEMORY_MANAGER_USAGE_THRESHOLD);
+  }
+
+  @Override
+  public DoubleEnvironmentVariable getMemoryWarningThreshold() {
+    return new DoubleEnvironmentVariable(Envs.SP_MEMORY_WARNING_THRESHOLD);
+    }
+
   @Override
   public StringEnvironmentVariable getDatalakeSchedulerCron() {
     return new StringEnvironmentVariable(Envs.SP_DATALAKE_SCHEDULER_CRON);
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index cf2f09a055..79cc859251 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -18,7 +18,12 @@
 package org.apache.streampipes.commons.environment;
 
 import org.apache.streampipes.commons.environment.model.OAuthConfiguration;
-import org.apache.streampipes.commons.environment.variable.*;
+import 
org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.DoubleEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.LongEnvironmentVariable;
+import 
org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
 
 import java.util.List;
 
@@ -234,5 +239,25 @@ public interface Environment {
 
   BooleanEnvironmentVariable getLoadManagerEnable();
 
+  //SpRateLimiter
+  LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod();
+  IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds();
+  IntEnvironmentVariable getRateLimiterSchedulerPeriodSeconds();
+  IntEnvironmentVariable getRateLimiterStatsResetThreshold();
+  IntEnvironmentVariable getRateLimiterStatsResetFactor();
+  IntEnvironmentVariable getRateLimiterStatsResetDivisor();
+  IntEnvironmentVariable getRateLimiterShutdownTimeoutSeconds();
+  LongEnvironmentVariable getRateLimiterTimeoutMs();
+  DoubleEnvironmentVariable getRateLimiterPermitsSetPercentage();
+
+  //SpMemoryManager
+  LongEnvironmentVariable getMemoryManagerDefaultInitialMemory();
+  LongEnvironmentVariable getMemoryManagerWaitTimeoutMs();
+  IntEnvironmentVariable getMemorySchedulerInitialDelaySeconds();
+  IntEnvironmentVariable getMemorySchedulerPeriodSeconds();
+  LongEnvironmentVariable getMemoryBytesToMb();
+  IntEnvironmentVariable getMemoryManagerShutdownTimeoutSeconds();
+  DoubleEnvironmentVariable getMemoryManagerUsageThreshold();
+  DoubleEnvironmentVariable getMemoryWarningThreshold();
   StringEnvironmentVariable getDatalakeSchedulerCron();
 }
diff --git 
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/LongEnvironmentVariable.java
similarity index 70%
copy from 
streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
copy to 
streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/LongEnvironmentVariable.java
index a75fbb0446..b7f9cb9ede 100644
--- 
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/LongEnvironmentVariable.java
@@ -16,9 +16,16 @@
  *
  */
 
-package org.apache.streampipes.messaging;
+package org.apache.streampipes.commons.environment.variable;
 
-public interface InternalEventProcessor<T> {
+public class LongEnvironmentVariable extends EnvironmentVariable<Long> {
 
-  void onEvent(T event);
+  public LongEnvironmentVariable(org.apache.streampipes.commons.constants.Envs 
envVariable) {
+    super(envVariable);
+  }
+
+  @Override
+  public Long parse(String value) {
+    return Long.parseLong(value);
+  }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
index a4188e9664..9fa784feaa 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *  http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
index 5d4f3a8398..7fe693ecd6 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
@@ -20,8 +20,6 @@ package org.apache.streampipes.commons.prometheus.service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * Service Statistics
  */
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
new file mode 100644
index 0000000000..b1608b004e
--- /dev/null
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spmemorymanager;
+
+import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
+
+import io.prometheus.client.Gauge;
+
+/**
+ * Memory Manager Metrics Manager
+ */
+public class SpMemoryManagerMetrics {
+    
+  public static final Gauge MEMORY_USED_BYTES = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_memory_used_bytes",
+        "Amount of memory used in bytes"
+  );
+
+  public static final Gauge MEMORY_ALLOCATION_RATE = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_memory_allocation_rate_bytes_per_second",
+        "Memory allocation rate in bytes per second"
+  );
+
+  public static void updateCoreMetrics(double memoryUsedBytes, double 
allocationRate) {
+    MEMORY_USED_BYTES.set(memoryUsedBytes);
+    MEMORY_ALLOCATION_RATE.set(allocationRate);
+  }
+}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerStats.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerStats.java
new file mode 100644
index 0000000000..b006b8e764
--- /dev/null
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spmemorymanager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Memory Manager Statistics
+ */
+public class SpMemoryManagerStats {
+    
+  private static final Logger log = 
LoggerFactory.getLogger(SpMemoryManagerStats.class);
+
+  public double memoryUsedBytes = 0.0;
+  public double allocationRate = 0.0;
+
+  private final SpMemoryManagerMetrics metrics;
+
+  public SpMemoryManagerStats() {
+    this.metrics = new SpMemoryManagerMetrics();
+  }
+
+  /**
+   * Update all metrics with custom total memory
+   */
+  public void updateAllMetrics() {
+    SpMemoryManagerMetrics.updateCoreMetrics(memoryUsedBytes, allocationRate);
+  }
+
+  public double getMemoryUsedBytes() {
+    return memoryUsedBytes;
+  }
+
+  public void setMemoryUsedBytes(double memoryUsedBytes) {
+    this.memoryUsedBytes = memoryUsedBytes;
+  }
+
+  public double getAllocationRate() {
+    return allocationRate;
+  }
+
+  public void setAllocationRate(double allocationRate) {
+    this.allocationRate = allocationRate;
+  }
+
+  public SpMemoryManagerMetrics getMetrics() {
+    return metrics;
+  }
+}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
new file mode 100644
index 0000000000..0724cec394
--- /dev/null
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spratelimiter;
+
+import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
+
+import io.prometheus.client.Gauge;
+
+/**
+ * Rate Limiter Metrics Manager
+ */
+public class SpRateLimiterMetrics {
+    
+  public static final Gauge RATE_LIMITER_QUEUE_SIZE = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_rate_limiter_queue_size",
+        "Current size of the waiting queue"
+  );
+
+  public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME = 
StreamPipesCollectorRegistry.registerGauge(
+        "sp_rate_limiter_average_wait_time_seconds",
+        "Average wait time for permit acquisition in seconds"
+  );
+
+  public static void updateCoreMetrics(double queueSize, double 
averageWaitTime) {
+    RATE_LIMITER_QUEUE_SIZE.set(queueSize);
+    RATE_LIMITER_AVERAGE_WAIT_TIME.set(averageWaitTime);
+  }
+}
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterStats.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterStats.java
new file mode 100644
index 0000000000..d7bf47a61f
--- /dev/null
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spratelimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Rate Limiter Statistics
+ */
+public class SpRateLimiterStats {
+    
+  private static final Logger log = 
LoggerFactory.getLogger(SpRateLimiterStats.class);
+
+  public double queueSize = 0.0;
+  public double averageWaitTime = 0.0;
+
+  private final SpRateLimiterMetrics metrics;
+
+  public SpRateLimiterStats() {
+    this.metrics = new SpRateLimiterMetrics();
+  }
+
+  /**
+   * Update all metrics
+   */
+  public void updateAllMetrics() {
+    SpRateLimiterMetrics.updateCoreMetrics(queueSize, averageWaitTime);
+  }
+
+  public double getQueueSize() {
+    return queueSize;
+  }
+
+  public void setQueueSize(double queueSize) {
+    this.queueSize = queueSize;
+  }
+
+  public double getAverageWaitTime() {
+    return averageWaitTime;
+  }
+
+  public void setAverageWaitTime(double averageWaitTime) {
+    this.averageWaitTime = averageWaitTime;
+  }
+
+  public SpRateLimiterMetrics getMetrics() {
+    return metrics;
+  }
+}
diff --git a/streampipes-extensions-api/pom.xml 
b/streampipes-extensions-api/pom.xml
index 7239f49c37..c0893d94ed 100644
--- a/streampipes-extensions-api/pom.xml
+++ b/streampipes-extensions-api/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>streampipes-commons</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-commons</artifactId>
+            <version>0.98.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.streampipes</groupId>
             <artifactId>streampipes-messaging</artifactId>
@@ -48,6 +53,16 @@
             <artifactId>streampipes-model</artifactId>
             <version>0.99.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-commons</artifactId>
+            <version>0.99.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java
 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java
new file mode 100644
index 0000000000..39ed6fab78
--- /dev/null
+++ 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.api.limiter;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import 
org.apache.streampipes.commons.prometheus.spratelimiter.SpRateLimiterStats;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.sun.management.OperatingSystemMXBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A singleton rate limiter implementation for StreamPipes extensions.
+ * This class provides rate limiting functionality using Google Guava's 
RateLimiter.
+ * It supports configurable permits per second and warmup periods.
+ */
+public enum SpRateLimiter {
+
+  INSTANCE;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SpRateLimiter.class);
+
+  // Configuration constants
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
+  private final Environment env = Environments.getEnvironment();
+
+  private RateLimiter rateLimiter;
+
+  private double rateLimiterAverageWaitTime = 0.0;
+  
+  private long totalWaitTime = 0L;
+  private final AtomicInteger waitTimeCount = new AtomicInteger(0);
+  
+  private final AtomicInteger currentQueueSize = new AtomicInteger(0);
+
+  private SpRateLimiterStats stats;
+  private static volatile boolean schedulerInitialized = false;
+  private static ScheduledExecutorService scheduler;
+
+    /**
+   * Creates a rate limiter with default parameters.
+   * Default: calculated permits per second based on memory, 1000ms warmup 
period.
+   */
+  public void createRateLimiter() {
+    var defaultPermits = setPermit();
+    var defaultWarmupPeriod = 
env.getRateLimiterDefaultWarmupPeriod().getValueOrDefault();
+    createRateLimiter(defaultPermits, defaultWarmupPeriod, DEFAULT_TIME_UNIT);
+    initScheduledTasks();
+    LOG.info("RateLimiter created with {} permits/sec and scheduler 
initialized", defaultPermits);
+  }
+
+  /**
+   * Creates a rate limiter with the specified permits per second.
+   * Uses default warmup period of 1000ms.
+   *
+   * @param permitsPerSecond The number of permits per second
+   */
+  public void createRateLimiter(double permitsPerSecond) {
+    var defaultWarmupPeriod = 
env.getRateLimiterDefaultWarmupPeriod().getValueOrDefault();
+    createRateLimiter(permitsPerSecond, defaultWarmupPeriod, 
DEFAULT_TIME_UNIT);
+    initScheduledTasks();
+    LOG.info("RateLimiter created with {} permits/sec and scheduler 
initialized", permitsPerSecond);
+  }
+
+  public void initScheduledTasks() {
+    var schedulerInitialDelay = 
env.getRateLimiterSchedulerInitialDelaySeconds().getValueOrDefault();
+    var schedulerPeriod = 
env.getRateLimiterSchedulerPeriodSeconds().getValueOrDefault();
+    if (!schedulerInitialized) {
+      synchronized (SpRateLimiter.class) {
+        if (!schedulerInitialized) {
+          scheduler = Executors.newSingleThreadScheduledExecutor();
+          scheduler.scheduleAtFixedRate(this::scheduledTask, 
schedulerInitialDelay, schedulerPeriod, TimeUnit.SECONDS);
+          schedulerInitialized = true;
+        }
+      }
+    }
+  }
+
+  public void scheduledTask() {
+    this.stats = new SpRateLimiterStats();
+    stats.setAverageWaitTime(this.rateLimiterAverageWaitTime);
+    stats.setQueueSize(this.currentQueueSize.get());
+    stats.updateAllMetrics();
+  }
+
+  /**
+   * Creates a rate limiter with the specified parameters.
+   *
+   * @param permitsPerSecond The number of permits per second
+   * @param warmupPeriod     The warmup period
+   * @param unit             The time unit for the warmup period
+   * @throws IllegalArgumentException if parameters are invalid
+   */
+  public void createRateLimiter(double permitsPerSecond, long warmupPeriod, 
TimeUnit unit) {
+    if (this.rateLimiter == null) {
+      validateParameters(permitsPerSecond, warmupPeriod, unit);
+      this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, 
unit);
+      LOG.info("RateLimiter created with {} permits per second, warmup period: 
{} {}",
+          permitsPerSecond, warmupPeriod, unit);
+    } else {
+      LOG.warn("RateLimiter already exists. Use setRate() to modify the rate 
instead.");
+    }
+  }
+
+  /**
+   * Acquires a permit from the rate limiter for processing data, with timeout.
+   * Each request consumes exactly 1 permit regardless of data size.
+   * This provides simple and fair rate limiting based on request count.
+   *
+   * @param bytes The number of bytes to process (for logging purposes only)
+   * @return true if permit was acquired successfully, false if timeout 
occurred
+   * @throws InterruptedException if the current thread is interrupted while 
waiting
+   */
+  public boolean limit(long bytes) throws InterruptedException {
+    var timeOutMs = env.getRateLimiterTimeoutMs().getValueOrDefault();
+    if (this.rateLimiter == null) {
+      LOG.warn("RateLimiter has not been initialized. Please call 
createRateLimiter() first.");
+      return false;
+    }
+
+    long startTime = System.currentTimeMillis();
+
+    currentQueueSize.incrementAndGet();
+    
+    try {
+      int permits = (int) bytes;
+      long timeoutMs = timeOutMs;
+      boolean acquired = rateLimiter.tryAcquire(permits, timeoutMs, 
TimeUnit.MILLISECONDS);
+
+      long waitTime = System.currentTimeMillis() - startTime;
+      updateAverageWaitTime(waitTime);
+
+      if (!acquired) {
+        LOG.warn("Failed to acquire permit for {} bytes within {} ms timeout 
(rate: {} requests/sec)",
+                 bytes, timeoutMs, rateLimiter.getRate());
+      } else {
+        LOG.debug("Successfully acquired permit for {} bytes in {} ms (rate: 
{} requests/sec)",
+                 bytes, waitTime, rateLimiter.getRate()); 
+      }
+      return acquired;
+    } finally {
+      currentQueueSize.decrementAndGet();
+    }
+  }
+
+    /**
+     * Sets the number of permits based on a percentage of the JVM's maximum 
memory.
+     *
+     * @return The calculated number of permits
+     */
+  public static double setPermit() {
+    var permitsSetPercentage = 
Environments.getEnvironment().getRateLimiterPermitsSetPercentage().getValueOrDefault();
+    OperatingSystemMXBean systemMXBean = (OperatingSystemMXBean) 
java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    return systemMXBean.getTotalMemorySize() * permitsSetPercentage;
+  }
+
+  /**
+   * Updates the rate of the rate limiter.
+   *
+   * @param permitsPerSecond The new rate in permits per second
+   * @throws IllegalStateException if the rate limiter is not initialized
+   */
+  public void setRate(double permitsPerSecond) {
+    if (this.rateLimiter != null) {
+      this.rateLimiter.setRate(permitsPerSecond);
+      LOG.info("RateLimiter rate updated to {} permits per second", 
permitsPerSecond);
+    } else {
+      throw new IllegalStateException("RateLimiter has not been initialized.");
+    }
+  }
+
+  /**
+   * Gets the current rate of the rate limiter.
+   *
+   * @return The current rate in permits per second
+   * @throws IllegalStateException if the rate limiter is not initialized
+   */
+  public double getRate() {
+    if (this.rateLimiter != null) {
+      return this.rateLimiter.getRate();
+    } else {
+      throw new IllegalStateException("RateLimiter has not been initialized.");
+    }
+  }
+
+  /**
+   * Checks if the rate limiter has been initialized.
+   *
+   * @return true if initialized, false otherwise
+   */
+  public boolean isInitialized() {
+    return this.rateLimiter != null;
+  }
+
+  /**
+   * Resets the rate limiter to its uninitialized state.
+   */
+  public void reset() {
+    if (this.rateLimiter != null) {
+      this.rateLimiter = null;
+      LOG.info("RateLimiter has been reset");
+    }
+  }
+
+  /**
+   * Gets the rate limiter queue size metric.
+   *
+   * @return The current queue size
+   */
+  public int getCurrentQueueSize() {
+    return currentQueueSize.get();
+  }
+
+  /**
+   * Gets the rate limiter average wait time metric.
+   *
+   * @return The average wait time in seconds
+   */
+  public double getRateLimiterAverageWaitTime() {
+    return rateLimiterAverageWaitTime;
+  }
+
+  /**
+   * Sets the rate limiter average wait time metric.
+   *
+   * @param averageWaitTime The average wait time in seconds
+   */
+  public void setRateLimiterAverageWaitTime(double averageWaitTime) {
+    this.rateLimiterAverageWaitTime = averageWaitTime;
+  }
+
+
+  private void validateParameters(double permitsPerSecond, long warmupPeriod, 
TimeUnit unit) {
+    if (permitsPerSecond <= 0) {
+      throw new IllegalArgumentException("permitsPerSecond must be positive, 
got: " + permitsPerSecond);
+    }
+    if (warmupPeriod < 0) {
+      throw new IllegalArgumentException("warmupPeriod must be non-negative, 
got: " + warmupPeriod);
+    }
+    if (unit == null) {
+      throw new IllegalArgumentException("TimeUnit cannot be null");
+    }
+  }
+
+  private void updateAverageWaitTime(long waitTimeMs) {
+    var statsResetThreshold = 
env.getRateLimiterStatsResetThreshold().getValueOrDefault();
+    var statsResetFactor = 
env.getRateLimiterStatsResetFactor().getValueOrDefault();
+    var statsResetDivisor = 
env.getRateLimiterStatsResetDivisor().getValueOrDefault();
+    totalWaitTime += waitTimeMs;
+    int currentCount = waitTimeCount.incrementAndGet();
+    
+    rateLimiterAverageWaitTime = (double) totalWaitTime / currentCount / 
1000.0;
+    
+    if (currentCount > statsResetThreshold) {
+      totalWaitTime = totalWaitTime * statsResetFactor / statsResetDivisor;
+      waitTimeCount.set(statsResetFactor);
+    }
+  }
+
+  public SpRateLimiterStats getStats() {
+    return stats;
+  }
+  
+  public void resetQueueSize() {
+    currentQueueSize.set(0);
+    LOG.info("Queue size has been reset");
+  }
+
+  public void shutdown() {
+    var shutdownTimeoutSeconds = 
env.getRateLimiterShutdownTimeoutSeconds().getValueOrDefault();
+    if (scheduler != null && !scheduler.isShutdown()) {
+      scheduler.shutdown();
+      try {
+        if (!scheduler.awaitTermination(shutdownTimeoutSeconds, 
TimeUnit.SECONDS)) {
+          scheduler.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        scheduler.shutdownNow();
+        Thread.currentThread().interrupt();
+        LOG.warn("RateLimiter scheduler shutdown interrupted", e);
+      }
+    }
+  }
+}
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java
 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java
new file mode 100644
index 0000000000..5a3836e332
--- /dev/null
+++ 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.api.memorymanager;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import 
org.apache.streampipes.commons.prometheus.spmemorymanager.SpMemoryManagerStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A singleton memory manager implementation for StreamPipes extensions.
+ * This class provides memory allocation and deallocation functionality with
+ * blocking behavior when insufficient memory is available.
+ */
+public enum SpMemoryManager {
+
+  INSTANCE;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SpMemoryManager.class);
+  private final Environment env = Environments.getEnvironment();
+
+  private AtomicLong freeMemory;
+
+  private double memoryUsedBytes = 0.0;
+  private double memoryAllocationRate = 0.0;
+  
+  private long lastAllocationTime = System.currentTimeMillis();
+  private long totalAllocatedBytes = 0L;
+  
+  // Memory control state
+  private volatile boolean memoryBlocked = false;
+  private volatile boolean memoryWarningActive = false;
+
+  private SpMemoryManagerStats stats = new SpMemoryManagerStats();
+  private static volatile boolean schedulerInitialized = false;
+  private static ScheduledExecutorService scheduler;
+
+  SpMemoryManager() {
+    this.freeMemory = new 
AtomicLong(env.getMemoryManagerDefaultInitialMemory().getValueOrDefault());
+    initScheduledTask();
+  }
+
+  public void initScheduledTask() {
+    if (!schedulerInitialized) {
+      synchronized (SpMemoryManager.class) {
+        if (!schedulerInitialized) {
+          scheduler = Executors.newScheduledThreadPool(1);
+          scheduler.scheduleAtFixedRate(this::scheduledTask, 
env.getMemorySchedulerInitialDelaySeconds().getValueOrDefault(),
+                  env.getMemorySchedulerPeriodSeconds().getValueOrDefault(),
+                  java.util.concurrent.TimeUnit.SECONDS);
+          schedulerInitialized = true;
+        }
+      }
+    }
+  }
+
+  public void scheduledTask() {
+    this.stats = new SpMemoryManagerStats();
+    stats.setAllocationRate(this.getMemoryAllocationRate());
+    stats.setMemoryUsedBytes(this.getMemoryUsedBytes());
+    stats.updateAllMetrics();
+    
+    // Check memory usage thresholds
+    checkMemoryThresholds();
+  }
+  
+  /**
+   * Checks memory usage against configured thresholds and updates blocking 
state.
+   */
+  private void checkMemoryThresholds() {
+    double memoryUsageRatio = (double) getAllocatedMemory()
+            /  env.getMemoryManagerDefaultInitialMemory().getValueOrDefault();
+    
+    if (memoryUsageRatio >= 
env.getMemoryManagerUsageThreshold().getValueOrDefault()) {
+      if (!memoryBlocked) {
+        memoryBlocked = true;
+        LOG.warn("Memory usage reached {}% threshold. Blocking data 
consumption.", 
+            (int)(env.getMemoryManagerUsageThreshold().getValueOrDefault() * 
100));
+      }
+    } else if (memoryUsageRatio <= 
env.getMemoryWarningThreshold().getValueOrDefault()) {
+      if (memoryBlocked) {
+        memoryBlocked = false;
+        LOG.info("Memory usage dropped below {}% threshold. Resuming data 
consumption.", 
+            (int)(env.getMemoryWarningThreshold().getValueOrDefault() * 100));
+      }
+    }
+    
+    // Update warning state
+    memoryWarningActive = memoryUsageRatio >= 
env.getMemoryWarningThreshold().getValueOrDefault()
+                          && memoryUsageRatio < 
env.getMemoryManagerUsageThreshold().getValueOrDefault();
+  }
+
+  /**
+   * Allocates the specified amount of memory.
+   * If insufficient memory is available, this method will block until memory 
becomes available.
+   * If memory usage is above threshold, allocation will be blocked.
+   *
+   * @param bytes The number of bytes to allocate
+   * @throws IllegalArgumentException if bytes is non-positive
+   */
+  public void allocate(long bytes) {
+    if (bytes <= 0) {
+      LOG.warn("Attempted to allocate non-positive memory: {} bytes", bytes);
+      return;
+    }
+    
+    // Check if memory is blocked due to threshold
+    if (memoryBlocked) {
+      LOG.warn("Memory allocation blocked due to high memory usage threshold. "
+              + "Current usage: {}%", getMemoryUsagePercentage());
+      return;
+    }
+
+    // Loop until enough memory is available
+    while (true) {
+      long currentFree = freeMemory.get();
+      long newFreeMemory = currentFree - bytes;
+
+      if (newFreeMemory >= 0) {
+        // Try to atomically update the free memory
+        if (freeMemory.compareAndSet(currentFree, newFreeMemory)) {
+          // Successfully allocated memory
+          long allocatedMemory = 
env.getMemoryManagerDefaultInitialMemory().getValueOrDefault() - newFreeMemory;
+          memoryUsedBytes = (double) allocatedMemory;
+          
+          updateAllocationRate(bytes);
+          return;
+        }
+      } else {
+        // Insufficient memory, block and wait
+        LOG.warn("Not enough free memory to allocate {} bytes. Current free 
memory: {} bytes. "
+            + "Blocking allocation.", bytes, currentFree);
+
+        // Use LockSupport for non-blocking wait
+        long startTime = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTime
+                < env.getMemoryManagerWaitTimeoutMs().getValueOrDefault()) {
+          LockSupport.parkNanos(1_000_000); // 1ms
+          if (Thread.currentThread().isInterrupted()) {
+            LOG.warn("Memory allocation blocking was interrupted");
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Frees the specified amount of memory.
+   * This will notify any threads waiting for memory allocation.
+   *
+   * @param bytes The number of bytes to free
+   * @throws IllegalArgumentException if bytes is non-positive
+   */
+  public void free(long bytes) {
+    if (bytes <= 0) {
+      LOG.warn("Attempted to free non-positive memory: {} bytes", bytes);
+      return;
+    }
+
+    long newFreeMemory = freeMemory.addAndGet(bytes);
+    
+    long allocatedMemory = 
env.getMemoryManagerDefaultInitialMemory().getValueOrDefault() - newFreeMemory;
+    memoryUsedBytes = (double) allocatedMemory;
+  }
+
+  /**
+   * Gets the current amount of free memory.
+   *
+   * @return The current free memory in bytes
+   */
+  public long getFreeMemory() {
+    return freeMemory.get();
+  }
+
+  /**
+   * Gets the total allocated memory (initial memory - free memory).
+   *
+   * @return The total allocated memory in bytes
+   */
+  public long getAllocatedMemory() {
+    return env.getMemoryManagerDefaultInitialMemory().getValueOrDefault() - 
freeMemory.get();
+  }
+
+  /**
+   * Gets the memory used bytes metric.
+   *
+   * @return The memory used in bytes
+   */
+  public double getMemoryUsedBytes() {
+    return memoryUsedBytes;
+  }
+
+  /**
+   * Sets the memory used bytes metric.
+   *
+   * @param usedBytes The memory used in bytes
+   */
+  public void setMemoryUsedBytes(double usedBytes) {
+    this.memoryUsedBytes = usedBytes;
+  }
+
+  /**
+   * Gets the memory allocation rate metric.
+   *
+   * @return The allocation rate in bytes per second
+   */
+  public double getMemoryAllocationRate() {
+    return memoryAllocationRate;
+  }
+
+  /**
+   * Sets the memory allocation rate metric.
+   *
+   * @param allocationRate The allocation rate in bytes per second
+   */
+  public void setMemoryAllocationRate(double allocationRate) {
+    this.memoryAllocationRate = allocationRate;
+  }
+
+  
+  private void updateAllocationRate(long allocatedBytes) {
+    long currentTime = System.currentTimeMillis();
+    long timeDiff = currentTime - lastAllocationTime;
+    
+    if (timeDiff > 0) {
+      totalAllocatedBytes += allocatedBytes;
+      
+      double rate = (double) totalAllocatedBytes / (timeDiff / 1000.0);
+      memoryAllocationRate = rate;
+    }
+    
+    lastAllocationTime = currentTime;
+  }
+
+    public long getTotalAllocatedBytes() {
+        return totalAllocatedBytes;
+    }
+
+    public SpMemoryManagerStats getStats() {
+        return stats;
+    }
+    
+    /**
+     * Gets the current memory usage percentage.
+     *
+     * @return The memory usage percentage (0.0 to 1.0)
+     */
+    public double getMemoryUsagePercentage() {
+        return (double) getAllocatedMemory() / 
env.getMemoryManagerDefaultInitialMemory().getValueOrDefault();
+    }
+    
+    /**
+     * Checks if memory allocation is currently blocked due to high usage.
+     *
+     * @return true if memory allocation is blocked, false otherwise
+     */
+    public boolean isMemoryBlocked() {
+        return memoryBlocked;
+    }
+    
+    /**
+     * Checks if memory warning is currently active.
+     *
+     * @return true if memory warning is active, false otherwise
+     */
+    public boolean isMemoryWarningActive() {
+        return memoryWarningActive;
+    }
+    
+    /**
+     * Gets the current memory usage in MB.
+     *
+     * @return The memory usage in MB
+     */
+    public double getMemoryUsageMB() {
+        return getAllocatedMemory() / (double) 
env.getMemoryBytesToMb().getValueOrDefault();
+    }
+    
+    /**
+     * Gets the total available memory in MB.
+     *
+     * @return The total available memory in MB
+     */
+    public double getTotalMemoryMB() {
+        return env.getMemoryManagerDefaultInitialMemory().getValueOrDefault()
+                / (double) env.getMemoryBytesToMb().getValueOrDefault();
+    }
+    
+    public static void shutdown() {
+        if (scheduler != null && !scheduler.isShutdown()) {
+            scheduler.shutdown();
+            try {
+                if (!scheduler.awaitTermination(Environments.
+                                getEnvironment().
+                                getMemoryManagerShutdownTimeoutSeconds().
+                                getValueOrDefault(),
+                        java.util.concurrent.TimeUnit.SECONDS)) {
+                    scheduler.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                scheduler.shutdownNow();
+                Thread.currentThread().interrupt();
+                LOG.warn("Memory manager scheduler shutdown was interrupted", 
e);
+            }
+        }
+    }
+}
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
index 64b6c2c510..9529810102 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
@@ -41,7 +41,7 @@ public enum HttpServerAdapterManagement {
     this.httpServerAdapters.remove(endpointId);
   }
 
-  public void notify(String endpointId, byte[] event) throws 
IllegalArgumentException {
+  public void notify(String endpointId, byte[] event) throws 
IllegalArgumentException, InterruptedException {
     if (httpServerAdapters.containsKey(endpointId)) {
       httpServerAdapters.get(endpointId).onEvent(event);
     } else {
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
index 35035fab3c..9503486e96 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
@@ -211,6 +211,7 @@ public class ServiceLoadDataReportGenerator {
       // Update service stats
       serviceStats.setCpuUsage(currentSnapshot.getCpu().getUsage());
       serviceStats.setMemoryUsage(currentSnapshot.getMemory().getUsage());
+      serviceStats.setWeight(newReport.getWeight());
       serviceStats.setSystemLoad(currentSnapshot.getAverageUsagePercent());
       
serviceStats.setHistoricalSystemLoad(historicalSnapshot.getAverageUsagePercent());
       serviceStats.updateAllMetrics();
diff --git 
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
 
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
index d062340782..11620e9881 100644
--- 
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
+++ 
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
@@ -22,11 +22,14 @@ import 
org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 
 public class RocketMQConsumer implements Runnable {
 
+  Logger logger = org.slf4j.LoggerFactory.getLogger(RocketMQConsumer.class);
+
   private InternalEventProcessor<byte[]> eventProcessor;
   private String brokerUrl;
   private String topic;
@@ -48,7 +51,12 @@ public class RocketMQConsumer implements Runnable {
   public void run() {
     try {
       this.consumer = RocketMQUtils.createConsumer(brokerUrl, topic, 
consumerGroup, messageView -> {
-        eventProcessor.onEvent(messageView.getBody().array());
+        try {
+          eventProcessor.onEvent(messageView.getBody().array());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          logger.warn("Event processing was interrupted", e);
+        }
         return ConsumeResult.SUCCESS;
       });
     } catch (ClientException e) {
diff --git 
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
 
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
index 4e1df94272..b46da76632 100644
--- 
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
@@ -19,8 +19,8 @@ package 
org.apache.streampipes.extensions.connectors.rocketmq.sink;
 
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
 import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
diff --git 
a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
 
b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
index 3cef5b98fc..80a6a4a52e 100644
--- 
a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
+++ 
b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
@@ -25,6 +25,7 @@ import 
org.apache.streampipes.model.grounding.JmsTransportProtocol;
 
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.util.ByteSequence;
+import org.slf4j.Logger;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -37,6 +38,8 @@ public class ActiveMQConsumer extends 
ActiveMQConnectionProvider implements
     EventConsumer,
     AutoCloseable, Serializable {
 
+  Logger logger = org.slf4j.LoggerFactory.getLogger(ActiveMQConsumer.class);
+
   private Session session;
   private MessageConsumer consumer;
   private InternalEventProcessor<byte[]> eventProcessor;
@@ -52,7 +55,12 @@ public class ActiveMQConsumer extends 
ActiveMQConnectionProvider implements
       consumer.setMessageListener(message -> {
         if (message instanceof BytesMessage) {
           ByteSequence bs = ((ActiveMQBytesMessage) message).getContent();
-          eventProcessor.onEvent(bs.getData());
+            try {
+                eventProcessor.onEvent(bs.getData());
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              logger.warn("Event processing was interrupted", e);
+            }
         }
 
       });
diff --git 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index 8413452765..ec2adcb80b 100644
--- 
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ 
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -74,7 +74,14 @@ public class SpKafkaConsumer implements EventConsumer, 
Runnable,
     Duration duration = Duration.of(100, ChronoUnit.MILLIS);
     while (isRunning) {
       ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
-      records.forEach(record -> eventProcessor.onEvent(record.value()));
+      records.forEach(record -> {
+          try {
+              eventProcessor.onEvent(record.value());
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Event processing interrupted in KafkaConsumer", e);
+          }
+      });
     }
     consumer.close();
   }
diff --git 
a/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
 
b/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
index cd2cfaa856..9336489a1d 100644
--- 
a/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
+++ 
b/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
@@ -28,9 +28,12 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
 
 public class PulsarConsumer implements EventConsumer {
 
+  Logger logger = org.slf4j.LoggerFactory.getLogger(PulsarConsumer.class);
+
   private PulsarClient pulsarClient;
   private Consumer<byte[]> consumer;
   private PulsarTransportProtocol protocolSettings;
@@ -57,7 +60,12 @@ public class PulsarConsumer implements EventConsumer {
           .messageListener(new MessageListener<byte[]>() {
             @Override
             public void received(Consumer<byte[]> consumer, Message<byte[]> 
msg) {
-              eventProcessor.onEvent(msg.getData());
+                try {
+                    eventProcessor.onEvent(msg.getData());
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  logger.warn("Event processing interrupted in 
PulsarConsumer", e);
+                }
             }
           })
           .subscribe();
diff --git 
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
 
b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
index a75fbb0446..5f1f878551 100644
--- 
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
+++ 
b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
@@ -20,5 +20,5 @@ package org.apache.streampipes.messaging;
 
 public interface InternalEventProcessor<T> {
 
-  void onEvent(T event);
+  void onEvent(T event) throws InterruptedException;
 }
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index eee0c27d59..d25b2b45c1 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.service.extensions;
 
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
 import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
@@ -81,6 +82,7 @@ public abstract class StreamPipesExtensionsServiceBase 
extends StreamPipesServic
       String serviceId = serviceDef.getServiceGroup() + "-" + 
AUTO_GENERATED_SERVICE_ID;
       serviceDef.setServiceId(serviceId);
       DeclarersSingleton.getInstance().populate(networkingConfig.getHost(), 
networkingConfig.getPort(), serviceDef);
+      SpRateLimiter.INSTANCE.createRateLimiter();
       startExtensionsService(this.getClass(), serviceDef, networkingConfig);
       ServiceLoadDataReportGenerator.getInstance().initialize();
     } catch (UnknownHostException e) {
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 8ba006df54..82e58d0587 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -109,10 +109,8 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
   public void process(Map<String, Object> rawEvent, long size, String 
topicName) {
     try {
       var sourceInfo = sourceInfoMapper.get(topicName);
-
       var event = EventFactory
           .fromMap(rawEvent, sourceInfo, schemaInfoMapper.get(topicName));
-
       this.onEvent(event, sourceInfo.getSourceId());
       increaseCounter(sourceInfo.getSourceId(), size);
     } catch (RuntimeException e) {
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index 6025ece466..947e28f0be 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -19,6 +19,8 @@
 package org.apache.streampipes.wrapper.standalone.routing;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
+import org.apache.streampipes.extensions.api.memorymanager.SpMemoryManager;
 import org.apache.streampipes.extensions.api.pe.routing.RawDataProcessor;
 import org.apache.streampipes.extensions.api.pe.routing.SpInputCollector;
 import org.apache.streampipes.messaging.EventConsumer;
@@ -42,12 +44,15 @@ public class StandaloneSpInputCollector<T extends 
TransportProtocol> extends
   }
 
   @Override
-  public void onEvent(byte[] event) {
+  public void onEvent(byte[] event) throws InterruptedException {
+    SpRateLimiter.INSTANCE.limit(event.length);
+    SpMemoryManager.INSTANCE.allocate(event.length);
     if (singletonEngine) {
       send(consumers.get(consumers.keySet().toArray()[0]), event);
     } else {
       consumers.forEach((key, value) -> send(value, event));
     }
+    SpMemoryManager.INSTANCE.free(event.length);
   }
 
   private void send(RawDataProcessor rawDataProcessor, byte[] event) {
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index 272081f768..aebf71046a 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.wrapper.standalone.runtime;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import 
org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
+import org.apache.streampipes.extensions.api.memorymanager.SpMemoryManager;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
@@ -64,6 +66,8 @@ public class StandaloneEventProcessorRuntime extends 
StandalonePipelineElementRu
   @Override
   public void process(Map<String, Object> rawEvent, long size, String 
sourceInfo) {
     try {
+      SpRateLimiter.INSTANCE.limit(size);
+      SpMemoryManager.INSTANCE.allocate(size);
       monitoringManager.increaseInCounter(instanceId, sourceInfo, size, 
System.currentTimeMillis());
       var event = this.internalRuntimeParameters.makeEvent(runtimeParameters, 
rawEvent, sourceInfo);
       pipelineElement
@@ -74,6 +78,11 @@ public class StandaloneEventProcessorRuntime extends 
StandalonePipelineElementRu
     } catch (RuntimeException e) {
       LOG.error("RuntimeException while processing event in {}", 
pipelineElement.getClass().getCanonicalName(), e);
       addLogEntry(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Event processing interrupted", e);
+    } finally {
+      SpMemoryManager.INSTANCE.free(size);
     }
   }
 
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
index e87d3cd930..859468a761 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.wrapper.standalone.runtime;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
+import org.apache.streampipes.extensions.api.memorymanager.SpMemoryManager;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
@@ -51,11 +53,18 @@ public class StandaloneEventSinkRuntime extends 
StandalonePipelineElementRuntime
   @Override
   public void process(Map<String, Object> rawEvent, long size, String 
sourceInfo) {
     try {
+      SpRateLimiter.INSTANCE.limit(size);
+      SpMemoryManager.INSTANCE.allocate(size);
       monitoringManager.increaseInCounter(instanceId, sourceInfo, size, 
System.currentTimeMillis());
       
pipelineElement.onEvent(internalRuntimeParameters.makeEvent(runtimeParameters, 
rawEvent, sourceInfo));
     } catch (RuntimeException e) {
       LOG.error("RuntimeException while processing event in {}", 
pipelineElement.getClass().getCanonicalName(), e);
       addLogEntry(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Event processing interrupted in {}", 
pipelineElement.getClass().getCanonicalName(), e);
+    } finally {
+      SpMemoryManager.INSTANCE.free(size);
     }
   }
 

Reply via email to