This is an automated email from the ASF dual-hosted git repository.
zhenyu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7ce00b2233 feat:Add SpRateLimiter and SpMemoryManager (#3796)
7ce00b2233 is described below
commit 7ce00b2233a93b80ba70ee7a2d8207d08a14474c
Author: xinhao liu <[email protected]>
AuthorDate: Fri Oct 31 17:30:06 2025 +0800
feat:Add SpRateLimiter and SpMemoryManager (#3796)
---
pom.xml | 7 +
.../apache/streampipes/commons/constants/Envs.java | 26 +-
.../commons/environment/DefaultEnvironment.java | 89 +++++-
.../commons/environment/Environment.java | 27 +-
.../variable/LongEnvironmentVariable.java | 13 +-
.../commons/exceptions/SpHttpErrorStatusCode.java | 2 +-
.../prometheus/service/ElementServiceStats.java | 2 -
.../spmemorymanager/SpMemoryManagerMetrics.java | 44 +++
.../spmemorymanager/SpMemoryManagerStats.java | 66 ++++
.../spratelimiter/SpRateLimiterMetrics.java | 44 +++
.../spratelimiter/SpRateLimiterStats.java | 66 ++++
streampipes-extensions-api/pom.xml | 15 +
.../extensions/api/limiter/SpRateLimiter.java | 307 +++++++++++++++++++
.../api/memorymanager/SpMemoryManager.java | 331 +++++++++++++++++++++
.../connect/HttpServerAdapterManagement.java | 2 +-
.../monitoring/ServiceLoadDataReportGenerator.java | 1 +
.../rocketmq/adapter/RocketMQConsumer.java | 10 +-
.../rocketmq/sink/RocketMQPublisherSink.java | 2 +-
.../messaging/jms/ActiveMQConsumer.java | 10 +-
.../messaging/kafka/SpKafkaConsumer.java | 9 +-
.../messaging/pulsar/PulsarConsumer.java | 10 +-
.../messaging/InternalEventProcessor.java | 2 +-
.../StreamPipesExtensionsServiceBase.java | 2 +
.../standalone/function/StreamPipesFunction.java | 2 -
.../routing/StandaloneSpInputCollector.java | 7 +-
.../runtime/StandaloneEventProcessorRuntime.java | 9 +
.../runtime/StandaloneEventSinkRuntime.java | 9 +
27 files changed, 1092 insertions(+), 22 deletions(-)
diff --git a/pom.xml b/pom.xml
index c968f7e29e..367abf1482 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
<eclipse.milo.version>0.6.16</eclipse.milo.version>
<error-prone.version>2.10.0</error-prone.version>
<file-management.version>3.1.0</file-management.version>
+ <findbugs.version>3.0.2</findbugs.version>
<flink.version>1.13.5</flink.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
<geojson-jackson.version>1.14</geojson-jackson.version>
@@ -240,6 +241,12 @@
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${findbugs.version}</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 42c5522d72..69b76952bd 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -170,10 +170,30 @@ public enum Envs {
SP_LOGGING_FILE_PREFIX("SP_LOGGING_FILE_PREFIX", "streampipes"),
SP_LOGGING_FILE_DIR("SP_LOGGING_FILE_DIR", "logs"),
SP_LOGGING_FILE_PATTERN(
- "SP_LOGGING_FILE_PATTERN", "%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36}
- %msg%n"
- );
-
+ "SP_LOGGING_FILE_PATTERN",
+ "%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"
+ ),
+ //SpRateLimiter
+
SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD("SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD",
"1000"),
+
SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS("SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS",
"0"),
+
SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS("SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS",
"15"),
+
SP_RATE_LIMITER_STATS_RESET_THRESHOLD("SP_RATE_LIMITER_STATS_RESET_THRESHOLD",
"1000"),
+ SP_RATE_LIMITER_STATS_RESET_FACTOR("SP_RATE_LIMITER_STATS_RESET_FACTOR",
"999"),
+ SP_RATE_LIMITER_STATS_RESET_DIVISOR("SP_RATE_LIMITER_STATS_RESET_DIVISOR",
"1000"),
+
SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS("SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS",
"5"),
+ SP_RATE_LIMITER_TIMEOUT_MS("SP_RATE_LIMITER_TIMEOUT_MS", "1000"),
+
SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE("SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE",
"0.7"),
+
+ //SpMemoryManager
+
SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY("SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY",
"1073741824"),
+ SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS("SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS",
"1000"),
+
SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS("SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS",
"0"),
+ SP_MEMORY_SCHEDULER_PERIOD_SECONDS("SP_MEMORY_SCHEDULER_PERIOD_SECONDS",
"15"),
+ SP_MEMORY_BYTES_TO_MB("SP_MEMORY_BYTES_TO_MB", "1048576"),
+
SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS("SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS",
"5"),
+ SP_MEMORY_MANAGER_USAGE_THRESHOLD("SP_MEMORY_MANAGER_USAGE_THRESHOLD",
"0.9"),
+ SP_MEMORY_WARNING_THRESHOLD("SP_MEMORY_WARNING_THRESHOLD", "0.8");
private final String envVariableName;
private String defaultValue;
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index e33ffd4199..b04aec1da8 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -22,9 +22,10 @@ import
org.apache.streampipes.commons.environment.model.OAuthConfiguration;
import
org.apache.streampipes.commons.environment.parser.OAuthConfigurationParser;
import
org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
import
org.apache.streampipes.commons.environment.variable.DoubleEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
import
org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.LongEnvironmentVariable;
import
org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
-import
org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
import java.util.List;
@@ -456,7 +457,6 @@ public class DefaultEnvironment implements Environment {
return new StringEnvironmentVariable(Envs.SP_RETENTION_LOCAL_DIR);
}
-
@Override
public DoubleEnvironmentVariable getDirMemoryResourceWeight() {
return new DoubleEnvironmentVariable(Envs.DIR_MEMORY_RESOURCE_WEIGHT);
@@ -517,6 +517,91 @@ public class DefaultEnvironment implements Environment {
return new BooleanEnvironmentVariable(Envs.LOAD_MANAGER_ENABLE);
}
+ @Override
+ public LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod() {
+ return new
LongEnvironmentVariable(Envs.SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD);
+ }
+
+ @Override
+ public IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds() {
+ return new
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS);
+ }
+
+ @Override
+ public IntEnvironmentVariable getRateLimiterSchedulerPeriodSeconds() {
+ return new
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS);
+ }
+
+ @Override
+ public IntEnvironmentVariable getRateLimiterStatsResetThreshold() {
+ return new
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_THRESHOLD);
+ }
+
+ @Override
+ public IntEnvironmentVariable getRateLimiterStatsResetFactor() {
+ return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_FACTOR);
+ }
+
+ @Override
+ public IntEnvironmentVariable getRateLimiterStatsResetDivisor() {
+ return new
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_DIVISOR);
+ }
+
+ @Override
+ public IntEnvironmentVariable getRateLimiterShutdownTimeoutSeconds() {
+ return new
IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS);
+ }
+
+ @Override
+ public LongEnvironmentVariable getRateLimiterTimeoutMs() {
+ return new LongEnvironmentVariable(Envs.SP_RATE_LIMITER_TIMEOUT_MS);
+ }
+
+ @Override
+ public DoubleEnvironmentVariable getRateLimiterPermitsSetPercentage() {
+ return new
DoubleEnvironmentVariable(Envs.SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE);
+ }
+
+ @Override
+ public LongEnvironmentVariable getMemoryManagerDefaultInitialMemory() {
+ return new
LongEnvironmentVariable(Envs.SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY);
+ }
+
+ @Override
+ public LongEnvironmentVariable getMemoryManagerWaitTimeoutMs() {
+ return new LongEnvironmentVariable(Envs.SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS);
+ }
+
+ @Override
+ public IntEnvironmentVariable getMemorySchedulerInitialDelaySeconds() {
+ return new
IntEnvironmentVariable(Envs.SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS);
+ }
+
+ @Override
+ public IntEnvironmentVariable getMemorySchedulerPeriodSeconds() {
+ return new IntEnvironmentVariable(Envs.SP_MEMORY_SCHEDULER_PERIOD_SECONDS);
+ }
+
+ @Override
+ public LongEnvironmentVariable getMemoryBytesToMb() {
+ return new LongEnvironmentVariable(Envs.SP_MEMORY_BYTES_TO_MB);
+ }
+
+ @Override
+ public IntEnvironmentVariable getMemoryManagerShutdownTimeoutSeconds() {
+ return new
IntEnvironmentVariable(Envs.SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS);
+ }
+
+ @Override
+ public DoubleEnvironmentVariable getMemoryManagerUsageThreshold() {
+ return new
DoubleEnvironmentVariable(Envs.SP_MEMORY_MANAGER_USAGE_THRESHOLD);
+ }
+
+ @Override
+ public DoubleEnvironmentVariable getMemoryWarningThreshold() {
+ return new DoubleEnvironmentVariable(Envs.SP_MEMORY_WARNING_THRESHOLD);
+ }
+
@Override
public StringEnvironmentVariable getDatalakeSchedulerCron() {
return new StringEnvironmentVariable(Envs.SP_DATALAKE_SCHEDULER_CRON);
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index cf2f09a055..79cc859251 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -18,7 +18,12 @@
package org.apache.streampipes.commons.environment;
import org.apache.streampipes.commons.environment.model.OAuthConfiguration;
-import org.apache.streampipes.commons.environment.variable.*;
+import
org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.DoubleEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.LongEnvironmentVariable;
+import
org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
import java.util.List;
@@ -234,5 +239,25 @@ public interface Environment {
BooleanEnvironmentVariable getLoadManagerEnable();
+ //SpRateLimiter
+ LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod();
+ IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds();
+ IntEnvironmentVariable getRateLimiterSchedulerPeriodSeconds();
+ IntEnvironmentVariable getRateLimiterStatsResetThreshold();
+ IntEnvironmentVariable getRateLimiterStatsResetFactor();
+ IntEnvironmentVariable getRateLimiterStatsResetDivisor();
+ IntEnvironmentVariable getRateLimiterShutdownTimeoutSeconds();
+ LongEnvironmentVariable getRateLimiterTimeoutMs();
+ DoubleEnvironmentVariable getRateLimiterPermitsSetPercentage();
+
+ //SpMemoryManager
+ LongEnvironmentVariable getMemoryManagerDefaultInitialMemory();
+ LongEnvironmentVariable getMemoryManagerWaitTimeoutMs();
+ IntEnvironmentVariable getMemorySchedulerInitialDelaySeconds();
+ IntEnvironmentVariable getMemorySchedulerPeriodSeconds();
+ LongEnvironmentVariable getMemoryBytesToMb();
+ IntEnvironmentVariable getMemoryManagerShutdownTimeoutSeconds();
+ DoubleEnvironmentVariable getMemoryManagerUsageThreshold();
+ DoubleEnvironmentVariable getMemoryWarningThreshold();
StringEnvironmentVariable getDatalakeSchedulerCron();
}
diff --git
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/LongEnvironmentVariable.java
similarity index 70%
copy from
streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
copy to
streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/LongEnvironmentVariable.java
index a75fbb0446..b7f9cb9ede 100644
---
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/LongEnvironmentVariable.java
@@ -16,9 +16,16 @@
*
*/
-package org.apache.streampipes.messaging;
+package org.apache.streampipes.commons.environment.variable;
-public interface InternalEventProcessor<T> {
+public class LongEnvironmentVariable extends EnvironmentVariable<Long> {
- void onEvent(T event);
+ public LongEnvironmentVariable(org.apache.streampipes.commons.constants.Envs
envVariable) {
+ super(envVariable);
+ }
+
+ @Override
+ public Long parse(String value) {
+ return Long.parseLong(value);
+ }
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
index a4188e9664..9fa784feaa 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
index 5d4f3a8398..7fe693ecd6 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java
@@ -20,8 +20,6 @@ package org.apache.streampipes.commons.prometheus.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* Service Statistics
*/
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
new file mode 100644
index 0000000000..b1608b004e
--- /dev/null
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spmemorymanager;
+
+import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
+
+import io.prometheus.client.Gauge;
+
+/**
+ * Memory Manager Metrics Manager
+ */
+public class SpMemoryManagerMetrics {
+
+ public static final Gauge MEMORY_USED_BYTES =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_memory_used_bytes",
+ "Amount of memory used in bytes"
+ );
+
+ public static final Gauge MEMORY_ALLOCATION_RATE =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_memory_allocation_rate_bytes_per_second",
+ "Memory allocation rate in bytes per second"
+ );
+
+ public static void updateCoreMetrics(double memoryUsedBytes, double
allocationRate) {
+ MEMORY_USED_BYTES.set(memoryUsedBytes);
+ MEMORY_ALLOCATION_RATE.set(allocationRate);
+ }
+}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerStats.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerStats.java
new file mode 100644
index 0000000000..b006b8e764
--- /dev/null
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spmemorymanager/SpMemoryManagerStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spmemorymanager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Memory Manager Statistics
+ */
+public class SpMemoryManagerStats {
+
+ private static final Logger log =
LoggerFactory.getLogger(SpMemoryManagerStats.class);
+
+ public double memoryUsedBytes = 0.0;
+ public double allocationRate = 0.0;
+
+ private final SpMemoryManagerMetrics metrics;
+
+ public SpMemoryManagerStats() {
+ this.metrics = new SpMemoryManagerMetrics();
+ }
+
+ /**
+ * Update all metrics with custom total memory
+ */
+ public void updateAllMetrics() {
+ SpMemoryManagerMetrics.updateCoreMetrics(memoryUsedBytes, allocationRate);
+ }
+
+ public double getMemoryUsedBytes() {
+ return memoryUsedBytes;
+ }
+
+ public void setMemoryUsedBytes(double memoryUsedBytes) {
+ this.memoryUsedBytes = memoryUsedBytes;
+ }
+
+ public double getAllocationRate() {
+ return allocationRate;
+ }
+
+ public void setAllocationRate(double allocationRate) {
+ this.allocationRate = allocationRate;
+ }
+
+ public SpMemoryManagerMetrics getMetrics() {
+ return metrics;
+ }
+}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
new file mode 100644
index 0000000000..0724cec394
--- /dev/null
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spratelimiter;
+
+import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
+
+import io.prometheus.client.Gauge;
+
+/**
+ * Rate Limiter Metrics Manager
+ */
+public class SpRateLimiterMetrics {
+
+ public static final Gauge RATE_LIMITER_QUEUE_SIZE =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_rate_limiter_queue_size",
+ "Current size of the waiting queue"
+ );
+
+ public static final Gauge RATE_LIMITER_AVERAGE_WAIT_TIME =
StreamPipesCollectorRegistry.registerGauge(
+ "sp_rate_limiter_average_wait_time_seconds",
+ "Average wait time for permit acquisition in seconds"
+ );
+
+ public static void updateCoreMetrics(double queueSize, double
averageWaitTime) {
+ RATE_LIMITER_QUEUE_SIZE.set(queueSize);
+ RATE_LIMITER_AVERAGE_WAIT_TIME.set(averageWaitTime);
+ }
+}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterStats.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterStats.java
new file mode 100644
index 0000000000..d7bf47a61f
--- /dev/null
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/spratelimiter/SpRateLimiterStats.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.commons.prometheus.spratelimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Rate Limiter Statistics
+ */
+public class SpRateLimiterStats {
+
+ private static final Logger log =
LoggerFactory.getLogger(SpRateLimiterStats.class);
+
+ public double queueSize = 0.0;
+ public double averageWaitTime = 0.0;
+
+ private final SpRateLimiterMetrics metrics;
+
+ public SpRateLimiterStats() {
+ this.metrics = new SpRateLimiterMetrics();
+ }
+
+ /**
+ * Update all metrics
+ */
+ public void updateAllMetrics() {
+ SpRateLimiterMetrics.updateCoreMetrics(queueSize, averageWaitTime);
+ }
+
+ public double getQueueSize() {
+ return queueSize;
+ }
+
+ public void setQueueSize(double queueSize) {
+ this.queueSize = queueSize;
+ }
+
+ public double getAverageWaitTime() {
+ return averageWaitTime;
+ }
+
+ public void setAverageWaitTime(double averageWaitTime) {
+ this.averageWaitTime = averageWaitTime;
+ }
+
+ public SpRateLimiterMetrics getMetrics() {
+ return metrics;
+ }
+}
diff --git a/streampipes-extensions-api/pom.xml
b/streampipes-extensions-api/pom.xml
index 7239f49c37..c0893d94ed 100644
--- a/streampipes-extensions-api/pom.xml
+++ b/streampipes-extensions-api/pom.xml
@@ -38,6 +38,11 @@
<artifactId>streampipes-commons</artifactId>
<version>0.99.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-commons</artifactId>
+ <version>0.98.0-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-messaging</artifactId>
@@ -48,6 +53,16 @@
<artifactId>streampipes-model</artifactId>
<version>0.99.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-commons</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java
new file mode 100644
index 0000000000..39ed6fab78
--- /dev/null
+++
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/limiter/SpRateLimiter.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.api.limiter;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import
org.apache.streampipes.commons.prometheus.spratelimiter.SpRateLimiterStats;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.sun.management.OperatingSystemMXBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A singleton rate limiter implementation for StreamPipes extensions.
+ * This class provides rate limiting functionality using Google Guava's
RateLimiter.
+ * It supports configurable permits per second and warmup periods.
+ */
+public enum SpRateLimiter {
+
+ INSTANCE;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SpRateLimiter.class);
+
+ // Configuration constants
+ private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
+ private final Environment env = Environments.getEnvironment();
+
+ private RateLimiter rateLimiter;
+
+ private double rateLimiterAverageWaitTime = 0.0;
+
+ private long totalWaitTime = 0L;
+ private final AtomicInteger waitTimeCount = new AtomicInteger(0);
+
+ private final AtomicInteger currentQueueSize = new AtomicInteger(0);
+
+ private SpRateLimiterStats stats;
+ private static volatile boolean schedulerInitialized = false;
+ private static ScheduledExecutorService scheduler;
+
+ /**
+ * Creates a rate limiter with default parameters.
+ * Default: calculated permits per second based on memory, 1000ms warmup
period.
+ */
+ public void createRateLimiter() {
+ var defaultPermits = setPermit();
+ var defaultWarmupPeriod =
env.getRateLimiterDefaultWarmupPeriod().getValueOrDefault();
+ createRateLimiter(defaultPermits, defaultWarmupPeriod, DEFAULT_TIME_UNIT);
+ initScheduledTasks();
+ LOG.info("RateLimiter created with {} permits/sec and scheduler
initialized", defaultPermits);
+ }
+
+ /**
+ * Creates a rate limiter with the specified permits per second.
+ * Uses default warmup period of 1000ms.
+ *
+ * @param permitsPerSecond The number of permits per second
+ */
+ public void createRateLimiter(double permitsPerSecond) {
+ var defaultWarmupPeriod =
env.getRateLimiterDefaultWarmupPeriod().getValueOrDefault();
+ createRateLimiter(permitsPerSecond, defaultWarmupPeriod,
DEFAULT_TIME_UNIT);
+ initScheduledTasks();
+ LOG.info("RateLimiter created with {} permits/sec and scheduler
initialized", permitsPerSecond);
+ }
+
+ public void initScheduledTasks() {
+ var schedulerInitialDelay =
env.getRateLimiterSchedulerInitialDelaySeconds().getValueOrDefault();
+ var schedulerPeriod =
env.getRateLimiterSchedulerPeriodSeconds().getValueOrDefault();
+ if (!schedulerInitialized) {
+ synchronized (SpRateLimiter.class) {
+ if (!schedulerInitialized) {
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ scheduler.scheduleAtFixedRate(this::scheduledTask,
schedulerInitialDelay, schedulerPeriod, TimeUnit.SECONDS);
+ schedulerInitialized = true;
+ }
+ }
+ }
+ }
+
+ public void scheduledTask() {
+ this.stats = new SpRateLimiterStats();
+ stats.setAverageWaitTime(this.rateLimiterAverageWaitTime);
+ stats.setQueueSize(this.currentQueueSize.get());
+ stats.updateAllMetrics();
+ }
+
+ /**
+ * Creates a rate limiter with the specified parameters.
+ *
+ * @param permitsPerSecond The number of permits per second
+ * @param warmupPeriod The warmup period
+ * @param unit The time unit for the warmup period
+ * @throws IllegalArgumentException if parameters are invalid
+ */
+ public void createRateLimiter(double permitsPerSecond, long warmupPeriod,
TimeUnit unit) {
+ if (this.rateLimiter == null) {
+ validateParameters(permitsPerSecond, warmupPeriod, unit);
+ this.rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod,
unit);
+ LOG.info("RateLimiter created with {} permits per second, warmup period:
{} {}",
+ permitsPerSecond, warmupPeriod, unit);
+ } else {
+ LOG.warn("RateLimiter already exists. Use setRate() to modify the rate
instead.");
+ }
+ }
+
+ /**
+ * Acquires a permit from the rate limiter for processing data, with timeout.
+ * Each request consumes exactly 1 permit regardless of data size.
+ * This provides simple and fair rate limiting based on request count.
+ *
+ * @param bytes The number of bytes to process (for logging purposes only)
+ * @return true if permit was acquired successfully, false if timeout
occurred
+ * @throws InterruptedException if the current thread is interrupted while
waiting
+ */
+ public boolean limit(long bytes) throws InterruptedException {
+ var timeOutMs = env.getRateLimiterTimeoutMs().getValueOrDefault();
+ if (this.rateLimiter == null) {
+ LOG.warn("RateLimiter has not been initialized. Please call
createRateLimiter() first.");
+ return false;
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ currentQueueSize.incrementAndGet();
+
+ try {
+ int permits = (int) bytes;
+ long timeoutMs = timeOutMs;
+ boolean acquired = rateLimiter.tryAcquire(permits, timeoutMs,
TimeUnit.MILLISECONDS);
+
+ long waitTime = System.currentTimeMillis() - startTime;
+ updateAverageWaitTime(waitTime);
+
+ if (!acquired) {
+ LOG.warn("Failed to acquire permit for {} bytes within {} ms timeout
(rate: {} requests/sec)",
+ bytes, timeoutMs, rateLimiter.getRate());
+ } else {
+ LOG.debug("Successfully acquired permit for {} bytes in {} ms (rate:
{} requests/sec)",
+ bytes, waitTime, rateLimiter.getRate());
+ }
+ return acquired;
+ } finally {
+ currentQueueSize.decrementAndGet();
+ }
+ }
+
+ /**
+ * Sets the number of permits based on a percentage of the JVM's maximum
memory.
+ *
+ * @return The calculated number of permits
+ */
+ public static double setPermit() {
+ var permitsSetPercentage =
Environments.getEnvironment().getRateLimiterPermitsSetPercentage().getValueOrDefault();
+ OperatingSystemMXBean systemMXBean = (OperatingSystemMXBean)
java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+ return systemMXBean.getTotalMemorySize() * permitsSetPercentage;
+ }
+
+ /**
+ * Updates the rate of the rate limiter.
+ *
+ * @param permitsPerSecond The new rate in permits per second
+ * @throws IllegalStateException if the rate limiter is not initialized
+ */
+ public void setRate(double permitsPerSecond) {
+ if (this.rateLimiter != null) {
+ this.rateLimiter.setRate(permitsPerSecond);
+ LOG.info("RateLimiter rate updated to {} permits per second",
permitsPerSecond);
+ } else {
+ throw new IllegalStateException("RateLimiter has not been initialized.");
+ }
+ }
+
+ /**
+ * Gets the current rate of the rate limiter.
+ *
+ * @return The current rate in permits per second
+ * @throws IllegalStateException if the rate limiter is not initialized
+ */
+ public double getRate() {
+ if (this.rateLimiter != null) {
+ return this.rateLimiter.getRate();
+ } else {
+ throw new IllegalStateException("RateLimiter has not been initialized.");
+ }
+ }
+
+ /**
+ * Checks if the rate limiter has been initialized.
+ *
+ * @return true if initialized, false otherwise
+ */
+ public boolean isInitialized() {
+ return this.rateLimiter != null;
+ }
+
+ /**
+ * Resets the rate limiter to its uninitialized state.
+ */
+ public void reset() {
+ if (this.rateLimiter != null) {
+ this.rateLimiter = null;
+ LOG.info("RateLimiter has been reset");
+ }
+ }
+
+ /**
+ * Gets the rate limiter queue size metric.
+ *
+ * @return The current queue size
+ */
+ public int getCurrentQueueSize() {
+ return currentQueueSize.get();
+ }
+
+ /**
+ * Gets the rate limiter average wait time metric.
+ *
+ * @return The average wait time in seconds
+ */
+ public double getRateLimiterAverageWaitTime() {
+ return rateLimiterAverageWaitTime;
+ }
+
+ /**
+ * Sets the rate limiter average wait time metric.
+ *
+ * @param averageWaitTime The average wait time in seconds
+ */
+ public void setRateLimiterAverageWaitTime(double averageWaitTime) {
+ this.rateLimiterAverageWaitTime = averageWaitTime;
+ }
+
+
+ private void validateParameters(double permitsPerSecond, long warmupPeriod,
TimeUnit unit) {
+ if (permitsPerSecond <= 0) {
+ throw new IllegalArgumentException("permitsPerSecond must be positive,
got: " + permitsPerSecond);
+ }
+ if (warmupPeriod < 0) {
+ throw new IllegalArgumentException("warmupPeriod must be non-negative,
got: " + warmupPeriod);
+ }
+ if (unit == null) {
+ throw new IllegalArgumentException("TimeUnit cannot be null");
+ }
+ }
+
+ private void updateAverageWaitTime(long waitTimeMs) {
+ var statsResetThreshold =
env.getRateLimiterStatsResetThreshold().getValueOrDefault();
+ var statsResetFactor =
env.getRateLimiterStatsResetFactor().getValueOrDefault();
+ var statsResetDivisor =
env.getRateLimiterStatsResetDivisor().getValueOrDefault();
+ totalWaitTime += waitTimeMs;
+ int currentCount = waitTimeCount.incrementAndGet();
+
+ rateLimiterAverageWaitTime = (double) totalWaitTime / currentCount /
1000.0;
+
+ if (currentCount > statsResetThreshold) {
+ totalWaitTime = totalWaitTime * statsResetFactor / statsResetDivisor;
+ waitTimeCount.set(statsResetFactor);
+ }
+ }
+
+ public SpRateLimiterStats getStats() {
+ return stats;
+ }
+
+ public void resetQueueSize() {
+ currentQueueSize.set(0);
+ LOG.info("Queue size has been reset");
+ }
+
+ public void shutdown() {
+ var shutdownTimeoutSeconds =
env.getRateLimiterShutdownTimeoutSeconds().getValueOrDefault();
+ if (scheduler != null && !scheduler.isShutdown()) {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(shutdownTimeoutSeconds,
TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ LOG.warn("RateLimiter scheduler shutdown interrupted", e);
+ }
+ }
+ }
+}
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java
new file mode 100644
index 0000000000..5a3836e332
--- /dev/null
+++
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/memorymanager/SpMemoryManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.api.memorymanager;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import
org.apache.streampipes.commons.prometheus.spmemorymanager.SpMemoryManagerStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A singleton memory manager implementation for StreamPipes extensions.
+ * This class provides memory allocation and deallocation functionality with
+ * blocking behavior when insufficient memory is available.
+ */
+public enum SpMemoryManager {
+
+ INSTANCE;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SpMemoryManager.class);
+ private final Environment env = Environments.getEnvironment();
+
+ private AtomicLong freeMemory;
+
+ private double memoryUsedBytes = 0.0;
+ private double memoryAllocationRate = 0.0;
+
+ private long lastAllocationTime = System.currentTimeMillis();
+ private long totalAllocatedBytes = 0L;
+
+ // Memory control state
+ private volatile boolean memoryBlocked = false;
+ private volatile boolean memoryWarningActive = false;
+
+ private SpMemoryManagerStats stats = new SpMemoryManagerStats();
+ private static volatile boolean schedulerInitialized = false;
+ private static ScheduledExecutorService scheduler;
+
+ SpMemoryManager() {
+ this.freeMemory = new
AtomicLong(env.getMemoryManagerDefaultInitialMemory().getValueOrDefault());
+ initScheduledTask();
+ }
+
+ public void initScheduledTask() {
+ if (!schedulerInitialized) {
+ synchronized (SpMemoryManager.class) {
+ if (!schedulerInitialized) {
+ scheduler = Executors.newScheduledThreadPool(1);
+ scheduler.scheduleAtFixedRate(this::scheduledTask,
env.getMemorySchedulerInitialDelaySeconds().getValueOrDefault(),
+ env.getMemorySchedulerPeriodSeconds().getValueOrDefault(),
+ java.util.concurrent.TimeUnit.SECONDS);
+ schedulerInitialized = true;
+ }
+ }
+ }
+ }
+
+ public void scheduledTask() {
+ this.stats = new SpMemoryManagerStats();
+ stats.setAllocationRate(this.getMemoryAllocationRate());
+ stats.setMemoryUsedBytes(this.getMemoryUsedBytes());
+ stats.updateAllMetrics();
+
+ // Check memory usage thresholds
+ checkMemoryThresholds();
+ }
+
+ /**
+ * Checks memory usage against configured thresholds and updates blocking
state.
+ */
+ private void checkMemoryThresholds() {
+ double memoryUsageRatio = (double) getAllocatedMemory()
+ / env.getMemoryManagerDefaultInitialMemory().getValueOrDefault();
+
+ if (memoryUsageRatio >=
env.getMemoryManagerUsageThreshold().getValueOrDefault()) {
+ if (!memoryBlocked) {
+ memoryBlocked = true;
+ LOG.warn("Memory usage reached {}% threshold. Blocking data
consumption.",
+ (int)(env.getMemoryManagerUsageThreshold().getValueOrDefault() *
100));
+ }
+ } else if (memoryUsageRatio <=
env.getMemoryWarningThreshold().getValueOrDefault()) {
+ if (memoryBlocked) {
+ memoryBlocked = false;
+ LOG.info("Memory usage dropped below {}% threshold. Resuming data
consumption.",
+ (int)(env.getMemoryWarningThreshold().getValueOrDefault() * 100));
+ }
+ }
+
+ // Update warning state
+ memoryWarningActive = memoryUsageRatio >=
env.getMemoryWarningThreshold().getValueOrDefault()
+ && memoryUsageRatio <
env.getMemoryManagerUsageThreshold().getValueOrDefault();
+ }
+
+ /**
+ * Allocates the specified amount of memory.
+ * If insufficient memory is available, this method will block until memory
becomes available.
+ * If memory usage is above threshold, allocation will be blocked.
+ *
+ * @param bytes The number of bytes to allocate
+ * @throws IllegalArgumentException if bytes is non-positive
+ */
+ public void allocate(long bytes) {
+ if (bytes <= 0) {
+ LOG.warn("Attempted to allocate non-positive memory: {} bytes", bytes);
+ return;
+ }
+
+ // Check if memory is blocked due to threshold
+ if (memoryBlocked) {
+ LOG.warn("Memory allocation blocked due to high memory usage threshold. "
+ + "Current usage: {}%", getMemoryUsagePercentage());
+ return;
+ }
+
+ // Loop until enough memory is available
+ while (true) {
+ long currentFree = freeMemory.get();
+ long newFreeMemory = currentFree - bytes;
+
+ if (newFreeMemory >= 0) {
+ // Try to atomically update the free memory
+ if (freeMemory.compareAndSet(currentFree, newFreeMemory)) {
+ // Successfully allocated memory
+ long allocatedMemory =
env.getMemoryManagerDefaultInitialMemory().getValueOrDefault() - newFreeMemory;
+ memoryUsedBytes = (double) allocatedMemory;
+
+ updateAllocationRate(bytes);
+ return;
+ }
+ } else {
+ // Insufficient memory, block and wait
+ LOG.warn("Not enough free memory to allocate {} bytes. Current free
memory: {} bytes. "
+ + "Blocking allocation.", bytes, currentFree);
+
+ // Use LockSupport for non-blocking wait
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime
+ < env.getMemoryManagerWaitTimeoutMs().getValueOrDefault()) {
+ LockSupport.parkNanos(1_000_000); // 1ms
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Memory allocation blocking was interrupted");
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Frees the specified amount of memory.
+ * This will notify any threads waiting for memory allocation.
+ *
+ * @param bytes The number of bytes to free
+ * @throws IllegalArgumentException if bytes is non-positive
+ */
+ public void free(long bytes) {
+ if (bytes <= 0) {
+ LOG.warn("Attempted to free non-positive memory: {} bytes", bytes);
+ return;
+ }
+
+ long newFreeMemory = freeMemory.addAndGet(bytes);
+
+ long allocatedMemory =
env.getMemoryManagerDefaultInitialMemory().getValueOrDefault() - newFreeMemory;
+ memoryUsedBytes = (double) allocatedMemory;
+ }
+
+ /**
+ * Gets the current amount of free memory.
+ *
+ * @return The current free memory in bytes
+ */
+ public long getFreeMemory() {
+ return freeMemory.get();
+ }
+
+ /**
+ * Gets the total allocated memory (initial memory - free memory).
+ *
+ * @return The total allocated memory in bytes
+ */
+ public long getAllocatedMemory() {
+ return env.getMemoryManagerDefaultInitialMemory().getValueOrDefault() -
freeMemory.get();
+ }
+
+ /**
+ * Gets the memory used bytes metric.
+ *
+ * @return The memory used in bytes
+ */
+ public double getMemoryUsedBytes() {
+ return memoryUsedBytes;
+ }
+
+ /**
+ * Sets the memory used bytes metric.
+ *
+ * @param usedBytes The memory used in bytes
+ */
+ public void setMemoryUsedBytes(double usedBytes) {
+ this.memoryUsedBytes = usedBytes;
+ }
+
+ /**
+ * Gets the memory allocation rate metric.
+ *
+ * @return The allocation rate in bytes per second
+ */
+ public double getMemoryAllocationRate() {
+ return memoryAllocationRate;
+ }
+
+ /**
+ * Sets the memory allocation rate metric.
+ *
+ * @param allocationRate The allocation rate in bytes per second
+ */
+ public void setMemoryAllocationRate(double allocationRate) {
+ this.memoryAllocationRate = allocationRate;
+ }
+
+
+ private void updateAllocationRate(long allocatedBytes) {
+ long currentTime = System.currentTimeMillis();
+ long timeDiff = currentTime - lastAllocationTime;
+
+ if (timeDiff > 0) {
+ totalAllocatedBytes += allocatedBytes;
+
+ double rate = (double) totalAllocatedBytes / (timeDiff / 1000.0);
+ memoryAllocationRate = rate;
+ }
+
+ lastAllocationTime = currentTime;
+ }
+
+ public long getTotalAllocatedBytes() {
+ return totalAllocatedBytes;
+ }
+
+ public SpMemoryManagerStats getStats() {
+ return stats;
+ }
+
+ /**
+ * Gets the current memory usage percentage.
+ *
+ * @return The memory usage percentage (0.0 to 1.0)
+ */
+ public double getMemoryUsagePercentage() {
+ return (double) getAllocatedMemory() /
env.getMemoryManagerDefaultInitialMemory().getValueOrDefault();
+ }
+
+ /**
+ * Checks if memory allocation is currently blocked due to high usage.
+ *
+ * @return true if memory allocation is blocked, false otherwise
+ */
+ public boolean isMemoryBlocked() {
+ return memoryBlocked;
+ }
+
+ /**
+ * Checks if memory warning is currently active.
+ *
+ * @return true if memory warning is active, false otherwise
+ */
+ public boolean isMemoryWarningActive() {
+ return memoryWarningActive;
+ }
+
+ /**
+ * Gets the current memory usage in MB.
+ *
+ * @return The memory usage in MB
+ */
+ public double getMemoryUsageMB() {
+ return getAllocatedMemory() / (double)
env.getMemoryBytesToMb().getValueOrDefault();
+ }
+
+ /**
+ * Gets the total available memory in MB.
+ *
+ * @return The total available memory in MB
+ */
+ public double getTotalMemoryMB() {
+ return env.getMemoryManagerDefaultInitialMemory().getValueOrDefault()
+ / (double) env.getMemoryBytesToMb().getValueOrDefault();
+ }
+
+ public static void shutdown() {
+ if (scheduler != null && !scheduler.isShutdown()) {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(Environments.
+ getEnvironment().
+ getMemoryManagerShutdownTimeoutSeconds().
+ getValueOrDefault(),
+ java.util.concurrent.TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ LOG.warn("Memory manager scheduler shutdown was interrupted",
e);
+ }
+ }
+ }
+}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
index 64b6c2c510..9529810102 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/HttpServerAdapterManagement.java
@@ -41,7 +41,7 @@ public enum HttpServerAdapterManagement {
this.httpServerAdapters.remove(endpointId);
}
- public void notify(String endpointId, byte[] event) throws
IllegalArgumentException {
+ public void notify(String endpointId, byte[] event) throws
IllegalArgumentException, InterruptedException {
if (httpServerAdapters.containsKey(endpointId)) {
httpServerAdapters.get(endpointId).onEvent(event);
} else {
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
index 35035fab3c..9503486e96 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/monitoring/ServiceLoadDataReportGenerator.java
@@ -211,6 +211,7 @@ public class ServiceLoadDataReportGenerator {
// Update service stats
serviceStats.setCpuUsage(currentSnapshot.getCpu().getUsage());
serviceStats.setMemoryUsage(currentSnapshot.getMemory().getUsage());
+ serviceStats.setWeight(newReport.getWeight());
serviceStats.setSystemLoad(currentSnapshot.getAverageUsagePercent());
serviceStats.setHistoricalSystemLoad(historicalSnapshot.getAverageUsagePercent());
serviceStats.updateAllMetrics();
diff --git
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
index d062340782..11620e9881 100644
---
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
+++
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/adapter/RocketMQConsumer.java
@@ -22,11 +22,14 @@ import
org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
import java.io.IOException;
public class RocketMQConsumer implements Runnable {
+ Logger logger = org.slf4j.LoggerFactory.getLogger(RocketMQConsumer.class);
+
private InternalEventProcessor<byte[]> eventProcessor;
private String brokerUrl;
private String topic;
@@ -48,7 +51,12 @@ public class RocketMQConsumer implements Runnable {
public void run() {
try {
this.consumer = RocketMQUtils.createConsumer(brokerUrl, topic,
consumerGroup, messageView -> {
- eventProcessor.onEvent(messageView.getBody().array());
+ try {
+ eventProcessor.onEvent(messageView.getBody().array());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Event processing was interrupted", e);
+ }
return ConsumeResult.SUCCESS;
});
} catch (ClientException e) {
diff --git
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
index 4e1df94272..b46da76632 100644
---
a/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
+++
b/streampipes-extensions/streampipes-connectors-rocketmq/src/main/java/org/apache/streampipes/extensions/connectors/rocketmq/sink/RocketMQPublisherSink.java
@@ -19,8 +19,8 @@ package
org.apache.streampipes.extensions.connectors.rocketmq.sink;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
diff --git
a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
index 3cef5b98fc..80a6a4a52e 100644
---
a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
+++
b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQConsumer.java
@@ -25,6 +25,7 @@ import
org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.util.ByteSequence;
+import org.slf4j.Logger;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@@ -37,6 +38,8 @@ public class ActiveMQConsumer extends
ActiveMQConnectionProvider implements
EventConsumer,
AutoCloseable, Serializable {
+ Logger logger = org.slf4j.LoggerFactory.getLogger(ActiveMQConsumer.class);
+
private Session session;
private MessageConsumer consumer;
private InternalEventProcessor<byte[]> eventProcessor;
@@ -52,7 +55,12 @@ public class ActiveMQConsumer extends
ActiveMQConnectionProvider implements
consumer.setMessageListener(message -> {
if (message instanceof BytesMessage) {
ByteSequence bs = ((ActiveMQBytesMessage) message).getContent();
- eventProcessor.onEvent(bs.getData());
+ try {
+ eventProcessor.onEvent(bs.getData());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Event processing was interrupted", e);
+ }
}
});
diff --git
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index 8413452765..ec2adcb80b 100644
---
a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++
b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -74,7 +74,14 @@ public class SpKafkaConsumer implements EventConsumer,
Runnable,
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
- records.forEach(record -> eventProcessor.onEvent(record.value()));
+ records.forEach(record -> {
+ try {
+ eventProcessor.onEvent(record.value());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Event processing interrupted in KafkaConsumer", e);
+ }
+ });
}
consumer.close();
}
diff --git
a/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
b/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
index cd2cfaa856..9336489a1d 100644
---
a/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
+++
b/streampipes-messaging-pulsar/src/main/java/org/apache/streampipes/messaging/pulsar/PulsarConsumer.java
@@ -28,9 +28,12 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
public class PulsarConsumer implements EventConsumer {
+ Logger logger = org.slf4j.LoggerFactory.getLogger(PulsarConsumer.class);
+
private PulsarClient pulsarClient;
private Consumer<byte[]> consumer;
private PulsarTransportProtocol protocolSettings;
@@ -57,7 +60,12 @@ public class PulsarConsumer implements EventConsumer {
.messageListener(new MessageListener<byte[]>() {
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]>
msg) {
- eventProcessor.onEvent(msg.getData());
+ try {
+ eventProcessor.onEvent(msg.getData());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Event processing interrupted in
PulsarConsumer", e);
+ }
}
})
.subscribe();
diff --git
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
index a75fbb0446..5f1f878551 100644
---
a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
+++
b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/InternalEventProcessor.java
@@ -20,5 +20,5 @@ package org.apache.streampipes.messaging;
public interface InternalEventProcessor<T> {
- void onEvent(T event);
+ void onEvent(T event) throws InterruptedException;
}
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index eee0c27d59..d25b2b45c1 100644
---
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.service.extensions;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
@@ -81,6 +82,7 @@ public abstract class StreamPipesExtensionsServiceBase
extends StreamPipesServic
String serviceId = serviceDef.getServiceGroup() + "-" +
AUTO_GENERATED_SERVICE_ID;
serviceDef.setServiceId(serviceId);
DeclarersSingleton.getInstance().populate(networkingConfig.getHost(),
networkingConfig.getPort(), serviceDef);
+ SpRateLimiter.INSTANCE.createRateLimiter();
startExtensionsService(this.getClass(), serviceDef, networkingConfig);
ServiceLoadDataReportGenerator.getInstance().initialize();
} catch (UnknownHostException e) {
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 8ba006df54..82e58d0587 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -109,10 +109,8 @@ public abstract class StreamPipesFunction implements
IStreamPipesFunctionDeclare
public void process(Map<String, Object> rawEvent, long size, String
topicName) {
try {
var sourceInfo = sourceInfoMapper.get(topicName);
-
var event = EventFactory
.fromMap(rawEvent, sourceInfo, schemaInfoMapper.get(topicName));
-
this.onEvent(event, sourceInfo.getSourceId());
increaseCounter(sourceInfo.getSourceId(), size);
} catch (RuntimeException e) {
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
index 6025ece466..947e28f0be 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/routing/StandaloneSpInputCollector.java
@@ -19,6 +19,8 @@
package org.apache.streampipes.wrapper.standalone.routing;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
+import org.apache.streampipes.extensions.api.memorymanager.SpMemoryManager;
import org.apache.streampipes.extensions.api.pe.routing.RawDataProcessor;
import org.apache.streampipes.extensions.api.pe.routing.SpInputCollector;
import org.apache.streampipes.messaging.EventConsumer;
@@ -42,12 +44,15 @@ public class StandaloneSpInputCollector<T extends
TransportProtocol> extends
}
@Override
- public void onEvent(byte[] event) {
+ public void onEvent(byte[] event) throws InterruptedException {
+ SpRateLimiter.INSTANCE.limit(event.length);
+ SpMemoryManager.INSTANCE.allocate(event.length);
if (singletonEngine) {
send(consumers.get(consumers.keySet().toArray()[0]), event);
} else {
consumers.forEach((key, value) -> send(value, event));
}
+ SpMemoryManager.INSTANCE.free(event.length);
}
private void send(RawDataProcessor rawDataProcessor, byte[] event) {
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index 272081f768..aebf71046a 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.wrapper.standalone.runtime;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import
org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
+import org.apache.streampipes.extensions.api.memorymanager.SpMemoryManager;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
@@ -64,6 +66,8 @@ public class StandaloneEventProcessorRuntime extends
StandalonePipelineElementRu
@Override
public void process(Map<String, Object> rawEvent, long size, String
sourceInfo) {
try {
+ SpRateLimiter.INSTANCE.limit(size);
+ SpMemoryManager.INSTANCE.allocate(size);
monitoringManager.increaseInCounter(instanceId, sourceInfo, size,
System.currentTimeMillis());
var event = this.internalRuntimeParameters.makeEvent(runtimeParameters,
rawEvent, sourceInfo);
pipelineElement
@@ -74,6 +78,11 @@ public class StandaloneEventProcessorRuntime extends
StandalonePipelineElementRu
} catch (RuntimeException e) {
LOG.error("RuntimeException while processing event in {}",
pipelineElement.getClass().getCanonicalName(), e);
addLogEntry(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Event processing interrupted", e);
+ } finally {
+ SpMemoryManager.INSTANCE.free(size);
}
}
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
index e87d3cd930..859468a761 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.wrapper.standalone.runtime;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.limiter.SpRateLimiter;
+import org.apache.streampipes.extensions.api.memorymanager.SpMemoryManager;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
@@ -51,11 +53,18 @@ public class StandaloneEventSinkRuntime extends
StandalonePipelineElementRuntime
@Override
public void process(Map<String, Object> rawEvent, long size, String
sourceInfo) {
try {
+ SpRateLimiter.INSTANCE.limit(size);
+ SpMemoryManager.INSTANCE.allocate(size);
monitoringManager.increaseInCounter(instanceId, sourceInfo, size,
System.currentTimeMillis());
pipelineElement.onEvent(internalRuntimeParameters.makeEvent(runtimeParameters,
rawEvent, sourceInfo));
} catch (RuntimeException e) {
LOG.error("RuntimeException while processing event in {}",
pipelineElement.getClass().getCanonicalName(), e);
addLogEntry(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Event processing interrupted in {}",
pipelineElement.getClass().getCanonicalName(), e);
+ } finally {
+ SpMemoryManager.INSTANCE.free(size);
}
}