This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push: new b62ed11 [api][runtime] Introduce metric mechanism (#74) b62ed11 is described below commit b62ed1158c98cfcce528b8cc172e6896cd29fca1 Author: Eugene <105473769+greateugen...@users.noreply.github.com> AuthorDate: Thu Jul 31 17:42:33 2025 +0800 [api][runtime] Introduce metric mechanism (#74) --- api/pom.xml | 9 ++ .../flink/agents/api/context/RunnerContext.java | 15 ++ .../agents/api/metrics/FlinkAgentsMetricGroup.java | 92 ++++++++++++ .../flink/agents/api/metrics/UpdatableGauge.java | 37 +++++ python/flink_agents/api/metric_group.py | 139 ++++++++++++++++++ python/flink_agents/api/runner_context.py | 21 +++ python/flink_agents/runtime/flink_metric_group.py | 160 +++++++++++++++++++++ .../flink_agents/runtime/flink_runner_context.py | 23 +++ python/flink_agents/runtime/local_runner.py | 13 ++ .../agents/runtime/context/RunnerContextImpl.java | 24 +++- .../runtime/metrics/BuiltInActionMetrics.java | 44 ++++++ .../agents/runtime/metrics/BuiltInMetrics.java | 72 ++++++++++ .../metrics/FlinkAgentsMetricGroupImpl.java | 104 ++++++++++++++ .../agents/runtime/metrics/UpdatableGaugeImpl.java | 42 ++++++ .../runtime/operator/ActionExecutionOperator.java | 25 +++- .../python/context/PythonRunnerContextImpl.java | 7 +- .../runtime/python/utils/PythonActionExecutor.java | 12 +- .../metrics/FlinkAgentsMetricGroupImplTest.java | 110 ++++++++++++++ .../runtime/metrics/UpdatableGaugeImplTest.java | 145 +++++++++++++++++++ 19 files changed, 1083 insertions(+), 11 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index d371967..512895a 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -28,4 +28,13 @@ under the License. <artifactId>flink-agents-api</artifactId> <name>Flink Agents : API</name> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + </project> \ No newline at end of file diff --git a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java index f4bdb6b..ae7307b 100644 --- a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java +++ b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java @@ -18,6 +18,7 @@ package org.apache.flink.agents.api.context; import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup; /** * A context object used during action execution. It is responsible for collecting output events @@ -38,4 +39,18 @@ public interface RunnerContext { * @throws Exception if the underlying state backend cannot be accessed */ MemoryObject getShortTermMemory() throws Exception; + + /** + * Gets the metric group for Flink Agents. + * + * @return the metric group shared across all actions. + */ + FlinkAgentsMetricGroup getAgentMetricGroup(); + + /** + * Gets the individual metric group dedicated for each action. + * + * @return the individual metric group specific to the current action. + */ + FlinkAgentsMetricGroup getActionMetricGroup(); } diff --git a/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java new file mode 100644 index 0000000..9ab3df3 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.api.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; + +/** + * Abstract base class providing metric group for action execution. This metric group offers access + * to various types of metrics. + */ +public interface FlinkAgentsMetricGroup { + + /** + * Create or retrieve a sub-metric group with the given name. + * + * @param name The name of the sub metric group. + * @return the sub-metric group instance. + */ + FlinkAgentsMetricGroup getSubGroup(String name); + + /** + * Create or retrieve a gauge with the given name. + * + * <p>Note: We use StringGauge here to ensure consistency across Python and Java interactions, + * avoiding potential type conflicts by standardizing on String as the value type. + * + * @param name The name of the gauge. + * @return the updatable gauge instance. + */ + UpdatableGauge getGauge(String name); + + /** + * Create or retrieve a counter with the given name. + * + * @param name The name of the counter. + * @return the counter instance. + */ + Counter getCounter(String name); + + /** + * Create or retrieve a meter with the given name. + * + * @param name The name of the meter. + * @return the meter instance. + */ + Meter getMeter(String name); + + /** + * Create or retrieve a meter with the given name and associate it with the provided counter. + * + * @param name The name of the meter. + * @param counter The counter to associate with the meter. + * @return the meter instance. + */ + Meter getMeter(String name, Counter counter); + + /** + * Create or retrieve a histogram with the given name using the default window size. + * + * @param name The name of the histogram. + * @return the histogram instance. + */ + Histogram getHistogram(String name); + + /** + * Create or retrieve a histogram with the given name and specified window size. + * + * @param name The name of the histogram. + * @param windowSize The sliding window size for histogram statistics. + * @return the histogram instance. + */ + Histogram getHistogram(String name, int windowSize); +} diff --git a/api/src/main/java/org/apache/flink/agents/api/metrics/UpdatableGauge.java b/api/src/main/java/org/apache/flink/agents/api/metrics/UpdatableGauge.java new file mode 100644 index 0000000..43dd94b --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/metrics/UpdatableGauge.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.api.metrics; + +import org.apache.flink.metrics.Gauge; + +/** + * UpdatableGauge interface defines a simple gauge that extends the Gauge interface. This interface + * provides a method to update the gauge value. + * + * @param <T> The type parameter of the gauge value + */ +public interface UpdatableGauge<T> extends Gauge<T> { + /** + * Updates the value of the UpdatableGauge. + * + * @param value The new value to be set. + */ + void update(T value); +} diff --git a/python/flink_agents/api/metric_group.py b/python/flink_agents/api/metric_group.py new file mode 100644 index 0000000..5f07aac --- /dev/null +++ b/python/flink_agents/api/metric_group.py @@ -0,0 +1,139 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from abc import ABC, abstractmethod + + +class MetricGroup(ABC): + """Abstract base class providing metric group for action execution. + + This metric group offers access to metrics. + """ + + @abstractmethod + def get_sub_group(self, name: str) -> "MetricGroup": + """Create or retrieve a sub-metric group with the given name. + + Parameters + ---------- + name : str + The name of the sub metric group. + """ + + @abstractmethod + def get_counter(self, name: str) -> "Counter": + """Create or retrieve a counter with the given name. + + Parameters + ---------- + name : str + The name of the counter. + """ + + @abstractmethod + def get_meter(self, name: str) -> "Meter": + """Create or retrieve a meter with the given name. + + Parameters + ---------- + name : str + The name of the meter. + """ + + @abstractmethod + def get_gauge(self, name: str) -> "Gauge": + """Create or retrieve a gauge with the given name. + + Parameters + ---------- + name : str + The name of the gauge. + """ + + @abstractmethod + def get_histogram(self, name: str, window_size: int = 100) -> "Histogram": + """Create or retrieve a histogram with the given name and window size. + + Parameters + ---------- + name : str + The name of the histogram. + window_size : int, optional + The sliding window size for histogram statistics. + """ + + +class Counter(ABC): + """A Counter that measures a count.""" + + @abstractmethod + def inc(self, n: int = 1) -> None: + """Increment the current count by the given value.""" + + @abstractmethod + def dec(self, n: int = 1) -> None: + """Decrement the current count by the given value.""" + + @abstractmethod + def get_count(self) -> int: + """Return the current count.""" + + +class Meter(ABC): + """Metric for measuring throughput.""" + + @abstractmethod + def mark(self, n: int =1) -> None: + """Trigger the meter by the given value.""" + + @abstractmethod + def get_rate(self) -> float: + """Return the current event rate per second.""" + + +class Histogram(ABC): + """The histogram allows to record values and create histogram statistics + for the currently seen elements. + """ + + @abstractmethod + def update(self, value: int) -> None: + """Update the histogram with the given value.""" + + @abstractmethod + def get_mean(self) -> float: + """Return the average value.""" + + @abstractmethod + def get_max(self) -> int: + """Return the maximum recorded value.""" + + @abstractmethod + def get_min(self) -> int: + """Return the minimum recorded value.""" + + +class Gauge(ABC): + """A gauge metric that returns a value by invoking a function.""" + + @abstractmethod + def update(self, value: float) -> None: + """Update the current value of the gauge.""" + + @abstractmethod + def get_value(self) -> float: + """Return the current value of the gauge.""" diff --git a/python/flink_agents/api/runner_context.py b/python/flink_agents/api/runner_context.py index a46da78..4042d90 100644 --- a/python/flink_agents/api/runner_context.py +++ b/python/flink_agents/api/runner_context.py @@ -19,6 +19,7 @@ from abc import ABC, abstractmethod from flink_agents.api.event import Event from flink_agents.api.memory_object import MemoryObject +from flink_agents.api.metric_group import MetricGroup from flink_agents.api.resource import Resource, ResourceType @@ -59,3 +60,23 @@ class RunnerContext(ABC): MemoryObject The root object of the short-term memory. """ + + @abstractmethod + def get_agent_metric_group(self) -> MetricGroup: + """Get the metric group for flink agents. + + Returns: + ------- + MetricGroup + The metric group shared across all actions. + """ + + @abstractmethod + def get_action_metric_group(self) -> MetricGroup: + """Get the individual metric group dedicated for each action. + + Returns: + ------- + MetricGroup + The individual metric group specific to the current action. + """ diff --git a/python/flink_agents/runtime/flink_metric_group.py b/python/flink_agents/runtime/flink_metric_group.py new file mode 100644 index 0000000..3172d00 --- /dev/null +++ b/python/flink_agents/runtime/flink_metric_group.py @@ -0,0 +1,160 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any + +from typing_extensions import override + +from flink_agents.api.metric_group import Counter, Gauge, Histogram, Meter, MetricGroup + + +class FlinkMetricGroup(MetricGroup): + """Implementation of MetricGroup for flink execution environment.""" + def __init__(self, j_metric_group: Any) -> None: + """Initialize a flink metric group with the given java metric group. + + Parameters + ---------- + j_metric_group : Any + Java metric group used to synchronize data with the Flink metric system. + """ + self._j_metric_group = j_metric_group + + @override + def get_sub_group(self, name: str) -> "FlinkMetricGroup": + return FlinkMetricGroup(self._j_metric_group.getSubGroup(name)) + + @override + def get_counter(self, name: str) -> "FlinkCounter": + return FlinkCounter(self._j_metric_group.getCounter(name)) + + @override + def get_meter(self, name: str) -> "FlinkMeter": + return FlinkMeter(self._j_metric_group.getMeter(name)) + + @override + def get_histogram(self, name: str, window_size: int =100) -> "FlinkHistogram": + return FlinkHistogram(self._j_metric_group.getHistogram(name, window_size)) + + @override + def get_gauge(self, name: str) -> "FlinkGauge": + return FlinkGauge(self._j_metric_group.getGauge(name)) + +class FlinkCounter(Counter): + """Implementation of Counter for flink execution environment.""" + def __init__(self, j_counter: Any) -> None: + """Initialize a flink runner context with the given java runner context. + + Parameters + ---------- + j_counter : Any + Java counter used for measuring the count of events. + """ + self._j_counter = j_counter + + @override + def inc(self, n: int = 1) -> None: + """Increment the current count by the given value.""" + self._j_counter.inc(n) + + @override + def dec(self, n: int = 1) -> None: + """Decrement the current count by the given value.""" + self._j_counter.dec(n) + + @override + def get_count(self) -> int: + """Return the current count.""" + return self._j_counter.getCount() + +class FlinkMeter(Meter): + """Implementation of Meter for flink execution environment.""" + def __init__(self, j_meter: Any) -> None: + """Initialize a flink meter with the given java meter. + + Parameters + ---------- + j_meter : Any + Java meter measures throughput. + """ + self._j_meter = j_meter + + @override + def mark(self, n: int = 1) -> None: + """Mark the occurrence of n events.""" + self._j_meter.markEvent(n) + + @override + def get_rate(self) -> float: + """Return the current event rate per second.""" + return self._j_meter.getRate() + +class FlinkHistogram(Histogram): + """Implementation of Histogram for flink execution environment.""" + def __init__(self, j_histogram: Any) -> None: + """Initialize a flink histogram with the given java histogram. + + Parameters + ---------- + j_histogram : Any + Java histogram used for recording values and computing statistical + summaries. + """ + self._j_histogram = j_histogram + self._j_statistics = j_histogram.getStatistics() + + @override + def update(self, value: int) -> None: + """Record a new value into the histogram.""" + self._j_histogram.update(value) + + @override + def get_mean(self) -> float: + """Return the average value.""" + return self._j_statistics.getMean() + + @override + def get_max(self) -> int: + """Return the maximum recorded value.""" + return self._j_statistics.getMax() + + @override + def get_min(self) -> int: + """Return the minimum recorded value.""" + return self._j_statistics.getMin() + +class FlinkGauge(Gauge): + """Implementation of Gauge for flink execution environment.""" + def __init__(self, j_gauge: Any) -> None: + """Initialize a flink gauge with the given java gauge. + + Parameters + ---------- + j_gauge : Any + Java gauge for recording a string value. + """ + self._j_gauge = j_gauge + + @override + def update(self, value: float) -> None: + """Update the gauge with the given value.""" + self._j_gauge.update(value) + + @override + def get_value(self) -> float: + """Return the current value of the gauge.""" + return self._j_gauge.getValue() diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py index ed2f0f6..316918f 100644 --- a/python/flink_agents/runtime/flink_runner_context.py +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -25,6 +25,7 @@ from flink_agents.api.resource import Resource, ResourceType from flink_agents.api.runner_context import RunnerContext from flink_agents.plan.agent_plan import AgentPlan from flink_agents.runtime.flink_memory_object import FlinkMemoryObject +from flink_agents.runtime.flink_metric_group import FlinkMetricGroup class FlinkRunnerContext(RunnerContext): @@ -82,6 +83,28 @@ class FlinkRunnerContext(RunnerContext): err_msg = "Failed to get short-term memory of runner context" raise RuntimeError(err_msg) from e + @override + def get_agent_metric_group(self) -> FlinkMetricGroup: + """Get the metric group for flink agents. + + Returns: + ------- + FlinkMetricGroup + The metric group shared across all actions. + """ + return FlinkMetricGroup(self._j_runner_context.getAgentMetricGroup()) + + @override + def get_action_metric_group(self) -> FlinkMetricGroup: + """Get the individual metric group dedicated for each action. + + Returns: + ------- + FlinkMetricGroup + The individual metric group specific to the current action. + """ + return FlinkMetricGroup(self._j_runner_context.getActionMetricGroup()) + def create_flink_runner_context(j_runner_context: Any, agent_plan_json: str) -> FlinkRunnerContext: """Used to create a FlinkRunnerContext Python object in Pemja environment.""" diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index d025386..6d251f8 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -25,6 +25,7 @@ from typing_extensions import override from flink_agents.api.agent import Agent from flink_agents.api.event import Event, InputEvent, OutputEvent from flink_agents.api.memory_object import MemoryObject +from flink_agents.api.metric_group import MetricGroup from flink_agents.api.resource import Resource, ResourceType from flink_agents.api.runner_context import RunnerContext from flink_agents.plan.agent_plan import AgentPlan @@ -115,6 +116,18 @@ class LocalRunnerContext(RunnerContext): """ return self._short_term_memory + @override + def get_agent_metric_group(self) -> MetricGroup: + # TODO: Support metric mechanism for local agent execution. + err_msg = "Metric mechanism is not supported for local agent execution yet." + raise NotImplementedError(err_msg) + + @override + def get_action_metric_group(self) -> MetricGroup: + # TODO: Support metric mechanism for local agent execution. + err_msg = "Metric mechanism is not supported for local agent execution yet." + raise NotImplementedError(err_msg) + class LocalRunner(AgentRunner): """Agent runner implementation for local execution, which is convenient for debugging. diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java index 7f6ec22..75e2309 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java @@ -22,6 +22,7 @@ import org.apache.flink.agents.api.context.MemoryObject; import org.apache.flink.agents.api.context.RunnerContext; import org.apache.flink.agents.plan.utils.JsonUtils; import org.apache.flink.agents.runtime.memory.MemoryObjectImpl; +import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl; import org.apache.flink.api.common.state.MapState; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.util.Preconditions; @@ -38,8 +39,29 @@ public class RunnerContextImpl implements RunnerContext { protected final List<Event> pendingEvents = new ArrayList<>(); protected final MapState<String, MemoryObjectImpl.MemoryItem> store; - public RunnerContextImpl(MapState<String, MemoryObjectImpl.MemoryItem> store) { + protected final FlinkAgentsMetricGroupImpl agentMetricGroup; + + protected String actionName; + + public RunnerContextImpl( + MapState<String, MemoryObjectImpl.MemoryItem> store, + FlinkAgentsMetricGroupImpl agentMetricGroup) { this.store = store; + this.agentMetricGroup = agentMetricGroup; + } + + public void setActionName(String actionName) { + this.actionName = actionName; + } + + @Override + public FlinkAgentsMetricGroupImpl getAgentMetricGroup() { + return agentMetricGroup; + } + + @Override + public FlinkAgentsMetricGroupImpl getActionMetricGroup() { + return agentMetricGroup.getSubGroup(actionName); } @Override diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInActionMetrics.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInActionMetrics.java new file mode 100644 index 0000000..2a518ef --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInActionMetrics.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; + +/** + * ActionMetricGroup class extends FlinkAgentsMetricGroupImpl and is used to monitor and measure the + * performance metrics of executing actions. It provides metrics for the total number of actions + * executed and the number of actions executed per second. + */ +public class BuiltInActionMetrics { + + private final Meter numOfActionsExecutedPerSec; + + public BuiltInActionMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup) { + Counter numOfActionsExecuted = parentMetricGroup.getCounter("numOfActionsExecuted"); + this.numOfActionsExecutedPerSec = + parentMetricGroup.getMeter("numOfActionsExecutedPerSec", numOfActionsExecuted); + } + + /** Marks that an action has finished executing. */ + public void markActionExecuted() { + numOfActionsExecutedPerSec.markEvent(); + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java new file mode 100644 index 0000000..0e96573 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.agents.plan.AgentPlan; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; + +import java.util.HashMap; + +/** + * Represents a group of built-in metrics for monitoring the performance and behavior of a flink + * agent job. This class is responsible for collecting and managing various metrics such as the + * number of events processed, the number of actions being executed, and the number of actions + * executed per second. + */ +public class BuiltInMetrics { + + private final Meter numOfEventProcessedPerSec; + + private final Meter numOfActionsExecutedPerSec; + + private final HashMap<String, BuiltInActionMetrics> actionMetricGroups; + + public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, AgentPlan agentPlan) { + Counter numOfEventsProcessed = parentMetricGroup.getCounter("numOfEventProcessed"); + this.numOfEventProcessedPerSec = + parentMetricGroup.getMeter("numOfEventProcessedPerSec", numOfEventsProcessed); + + Counter numOfActionsExecuted = parentMetricGroup.getCounter("numOfActionsExecuted"); + this.numOfActionsExecutedPerSec = + parentMetricGroup.getMeter("numOfActionsExecutedPerSec", numOfActionsExecuted); + + this.actionMetricGroups = new HashMap<>(); + for (String actionName : agentPlan.getActions().keySet()) { + actionMetricGroups.put( + actionName, + new BuiltInActionMetrics(parentMetricGroup.getSubGroup(actionName))); + } + } + + /** Records the occurrence of an event, increasing the count of events processed per second. */ + public void markEventProcessed() { + numOfEventProcessedPerSec.markEvent(); + } + + /** + * Marks that an action has finished executing. Decrements the executing actions counter and + * marks an event on the executed meter. + */ + public void markActionExecuted(String actionName) { + numOfActionsExecutedPerSec.markEvent(); + actionMetricGroups.get(actionName).markActionExecuted(); + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java new file mode 100644 index 0000000..6b0a236 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; + +import java.util.HashMap; + +/** + * Implementation of the FlinkAgentsMetricGroup interface, providing access to various types of + * metrics. This class extends ProxyMetricGroup and manages metrics such as counters, gauges, + * meters, and histograms. + */ +public class FlinkAgentsMetricGroupImpl extends ProxyMetricGroup<MetricGroup> + implements FlinkAgentsMetricGroup { + + private final HashMap<String, FlinkAgentsMetricGroupImpl> subMetricGroups = new HashMap<>(); + + private final HashMap<String, Counter> counters = new HashMap<>(); + + private final HashMap<String, Meter> meters = new HashMap<>(); + + private final HashMap<String, Histogram> histograms = new HashMap<>(); + + private final HashMap<String, UpdatableGaugeImpl> gauges = new HashMap<>(); + + public FlinkAgentsMetricGroupImpl(MetricGroup parentMetricGroup) { + super(parentMetricGroup); + } + + public FlinkAgentsMetricGroupImpl getSubGroup(String name) { + if (!subMetricGroups.containsKey(name)) { + subMetricGroups.put(name, new FlinkAgentsMetricGroupImpl(super.addGroup(name))); + } + return subMetricGroups.get(name); + } + + public UpdatableGaugeImpl getGauge(String name) { + if (!gauges.containsKey(name)) { + gauges.put(name, super.gauge(name, new UpdatableGaugeImpl())); + } + return gauges.get(name); + } + + public Counter getCounter(String name) { + if (!counters.containsKey(name)) { + counters.put(name, super.counter(name)); + } + return counters.get(name); + } + + public Meter getMeter(String name) { + if (!meters.containsKey(name)) { + meters.put(name, super.meter(name, new MeterView(60))); + } + return meters.get(name); + } + + public Meter getMeter(String name, Counter counter) { + if (!meters.containsKey(name)) { + meters.put(name, super.meter(name, new MeterView(counter))); + } + return meters.get(name); + } + + public Histogram getHistogram(String name) { + if (!histograms.containsKey(name)) { + histograms.put(name, super.histogram(name, new DescriptiveStatisticsHistogram(100))); + } + return histograms.get(name); + } + + public Histogram getHistogram(String name, int windowSize) { + if (!histograms.containsKey(name)) { + histograms.put( + name, super.histogram(name, new DescriptiveStatisticsHistogram(windowSize))); + } + return histograms.get(name); + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImpl.java new file mode 100644 index 0000000..1fd7297 --- /dev/null +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImpl.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.agents.api.metrics.UpdatableGauge; + +/** + * Implementation of the UpdatableGauge interface, used to monitor and track values. This class + * provides methods to set and retrieve the monitored value. + * + * @param <T> The type of value being monitored + */ +public class UpdatableGaugeImpl<T> implements UpdatableGauge<T> { + private T value; + + @Override + public void update(T value) { + this.value = value; + } + + @Override + public T getValue() { + return value; + } +} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index a0d1dc6..cbc7e10 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -27,6 +27,8 @@ import org.apache.flink.agents.plan.PythonFunction; import org.apache.flink.agents.runtime.context.RunnerContextImpl; import org.apache.flink.agents.runtime.env.PythonEnvironmentManager; import org.apache.flink.agents.runtime.memory.MemoryObjectImpl; +import org.apache.flink.agents.runtime.metrics.BuiltInMetrics; +import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; import org.apache.flink.agents.runtime.utils.EventUtil; @@ -81,6 +83,10 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT // PythonActionExecutor for Python actions private transient PythonActionExecutor pythonActionExecutor; + private transient FlinkAgentsMetricGroupImpl metricGroup; + + private transient BuiltInMetrics builtInMetrics; + public ActionExecutionOperator( AgentPlan agentPlan, Boolean inputIsJava, ProcessingTimeService processingTimeService) { this.agentPlan = agentPlan; @@ -102,7 +108,11 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT TypeInformation.of(String.class), TypeInformation.of(MemoryObjectImpl.MemoryItem.class)); shortTermMemState = getRuntimeContext().getMapState(shortTermMemStateDescriptor); - runnerContext = new RunnerContextImpl(shortTermMemState); + + metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup()); + builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan); + + runnerContext = new RunnerContextImpl(shortTermMemState, metricGroup); // init PythonActionExecutor initPythonActionExecutor(); @@ -121,6 +131,7 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT events.push(inputEvent); while (!events.isEmpty()) { Event event = events.pop(); + builtInMetrics.markEventProcessed(); List<Action> actions = getActionsTriggeredBy(event); if (actions != null && !actions.isEmpty()) { for (Action action : actions) { @@ -130,22 +141,28 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT // TODO: Implement asynchronous action execution. // execute action and collect output events - LOG.debug("Try execute action {} for event {}.", action.getName(), event); + String actionName = action.getName(); + LOG.debug("Try execute action {} for event {}.", actionName, event); List<Event> actionOutputEvents; if (action.getExec() instanceof JavaFunction) { + runnerContext.setActionName(actionName); action.getExec().call(event, runnerContext); actionOutputEvents = runnerContext.drainEvents(); } else if (action.getExec() instanceof PythonFunction) { checkState(event instanceof PythonEvent); actionOutputEvents = pythonActionExecutor.executePythonFunction( - (PythonFunction) action.getExec(), (PythonEvent) event); + (PythonFunction) action.getExec(), + (PythonEvent) event, + actionName); } else { throw new RuntimeException("Unsupported action type: " + action.getClass()); } + builtInMetrics.markActionExecuted(actionName); for (Event actionOutputEvent : actionOutputEvents) { if (EventUtil.isOutputEvent(actionOutputEvent)) { + builtInMetrics.markEventProcessed(); OUT outputData = getOutputFromOutputEvent(actionOutputEvent); LOG.debug( "Collect output data {} for input {} in action {}.", @@ -190,7 +207,7 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT new PythonActionExecutor( pythonEnvironmentManager, new ObjectMapper().writeValueAsString(agentPlan)); - pythonActionExecutor.open(shortTermMemState); + pythonActionExecutor.open(shortTermMemState, metricGroup); } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java index 455208f..d9a20fc 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java @@ -22,6 +22,7 @@ import org.apache.flink.agents.api.context.MemoryObject; import org.apache.flink.agents.api.context.RunnerContext; import org.apache.flink.agents.runtime.context.RunnerContextImpl; import org.apache.flink.agents.runtime.memory.MemoryObjectImpl; +import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.api.common.state.MapState; import org.apache.flink.util.Preconditions; @@ -32,8 +33,10 @@ import javax.annotation.concurrent.NotThreadSafe; @NotThreadSafe public class PythonRunnerContextImpl extends RunnerContextImpl { - public PythonRunnerContextImpl(MapState<String, MemoryObjectImpl.MemoryItem> store) { - super(store); + public PythonRunnerContextImpl( + MapState<String, MemoryObjectImpl.MemoryItem> store, + FlinkAgentsMetricGroupImpl agentMetricGroup) { + super(store, agentMetricGroup); } @Override diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index 1dcdf5b..905ddb6 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -22,6 +22,7 @@ import org.apache.flink.agents.plan.PythonFunction; import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment; import org.apache.flink.agents.runtime.env.PythonEnvironmentManager; import org.apache.flink.agents.runtime.memory.MemoryObjectImpl; +import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl; import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.agents.runtime.utils.EventUtil; @@ -59,7 +60,9 @@ public class PythonActionExecutor { this.agentPlanJson = agentPlanJson; } - public void open(MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState) + public void open( + MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState, + FlinkAgentsMetricGroupImpl metricGroup) throws Exception { environmentManager.open(); EmbeddedPythonEnvironment env = environmentManager.createEnvironment(); @@ -67,7 +70,7 @@ public class PythonActionExecutor { interpreter = env.getInterpreter(); interpreter.exec(PYTHON_IMPORTS); - runnerContext = new PythonRunnerContextImpl(shortTermMemState); + runnerContext = new PythonRunnerContextImpl(shortTermMemState, metricGroup); // TODO: remove the set and get runner context after updating pemja to version 0.5.3 Object pythonRunnerContextObject = @@ -75,9 +78,10 @@ public class PythonActionExecutor { interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, pythonRunnerContextObject); } - public List<Event> executePythonFunction(PythonFunction function, PythonEvent event) - throws Exception { + public List<Event> executePythonFunction( + PythonFunction function, PythonEvent event, String actionName) throws Exception { runnerContext.checkNoPendingEvents(); + runnerContext.setActionName(actionName); function.setInterpreter(interpreter); // TODO: remove the set and get runner context after updating pemja to version 0.5.3 diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java new file mode 100644 index 0000000..d58f8cd --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** Tests for {@link FlinkAgentsMetricGroupImpl}. */ +public class FlinkAgentsMetricGroupImplTest { + + private FlinkAgentsMetricGroupImpl metricGroup; + + @BeforeEach + void setUp() { + MetricGroup parentMetricGroup = + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + metricGroup = new FlinkAgentsMetricGroupImpl(parentMetricGroup); + } + + @Test + void testGetSubGroup() { + String name = "testGroup"; + FlinkAgentsMetricGroupImpl result = metricGroup.getSubGroup(name); + + assertNotNull(result); + assertEquals(result, metricGroup.getSubGroup(name)); + } + + @Test + void testGetGauge() { + String name = "testGauge"; + UpdatableGaugeImpl result = metricGroup.getGauge(name); + + assertNotNull(result); + assertSame(result, metricGroup.getGauge(name)); + } + + @Test + void testGetCounter() { + String name = "testCounter"; + Counter result = metricGroup.getCounter(name); + + assertNotNull(result); + assertEquals(result, metricGroup.getCounter(name)); + } + + @Test + void testGetMeterWithouCounter() { + String name = "testMeter"; + Meter result = metricGroup.getMeter(name); + + assertNotNull(result); + assertEquals(result, metricGroup.getMeter(name)); + } + + @Test + void testGetMeterWithCounter() { + String name = "testMeter"; + Counter counter = new SimpleCounter(); + + Meter result = metricGroup.getMeter(name, counter); + + assertNotNull(result); + assertEquals(result, metricGroup.getMeter(name)); + } + + @Test + void testGetHistogramWithoutWindowSize() { + String name = "testHistogram"; + Histogram result = metricGroup.getHistogram(name); + + assertNotNull(result); + assertEquals(result, metricGroup.getHistogram(name)); + } + + @Test + void testGetHistogramWithWindowSize() { + String name = "testHistogram"; + int windowSize = 200; + Histogram result = metricGroup.getHistogram(name, windowSize); + + assertNotNull(result); + assertEquals(result, metricGroup.getHistogram(name)); + } +} diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImplTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImplTest.java new file mode 100644 index 0000000..da4519c --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImplTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** Tests for {@link UpdatableGaugeImpl}. */ +class UpdatableGaugeImplTest { + + /** Test updating and getting integer values. */ + @Test + void testUpdateAndGetInteger() { + UpdatableGaugeImpl<Integer> intGauge = new UpdatableGaugeImpl<>(); + + // Test with positive value + intGauge.update(42); + assertEquals(42, intGauge.getValue(), "Should return the updated integer value"); + + // Test with zero + intGauge.update(0); + assertEquals(0, intGauge.getValue(), "Should return zero"); + + // Test with negative value + intGauge.update(-10); + assertEquals(-10, intGauge.getValue(), "Should return the negative integer value"); + + // Test with maximum value + intGauge.update(Integer.MAX_VALUE); + assertEquals(Integer.MAX_VALUE, intGauge.getValue(), "Should return Integer.MAX_VALUE"); + + // Test with minimum value + intGauge.update(Integer.MIN_VALUE); + assertEquals(Integer.MIN_VALUE, intGauge.getValue(), "Should return Integer.MIN_VALUE"); + } + + /** Test updating and getting double values. */ + @Test + void testUpdateAndGetDouble() { + UpdatableGaugeImpl<Double> doubleGauge = new UpdatableGaugeImpl<>(); + + // Test with positive value + doubleGauge.update(3.14); + assertEquals(3.14, doubleGauge.getValue(), 0.001, "Should return the updated double value"); + + // Test with zero + doubleGauge.update(0.0); + assertEquals(0.0, doubleGauge.getValue(), 0.001, "Should return zero"); + + // Test with negative value + doubleGauge.update(-2.5); + assertEquals( + -2.5, doubleGauge.getValue(), 0.001, "Should return the negative double value"); + + // Test with maximum value + doubleGauge.update(Double.MAX_VALUE); + assertEquals( + Double.MAX_VALUE, doubleGauge.getValue(), 0.001, "Should return Double.MAX_VALUE"); + + // Test with minimum value + doubleGauge.update(Double.MIN_VALUE); + assertEquals( + Double.MIN_VALUE, doubleGauge.getValue(), 0.001, "Should return Double.MIN_VALUE"); + } + + /** Test updating and getting float values. */ + @Test + void testUpdateAndGetFloat() { + UpdatableGaugeImpl<Float> floatGauge = new UpdatableGaugeImpl<>(); + + // Test with positive value + floatGauge.update(2.5f); + assertEquals(2.5f, floatGauge.getValue(), 0.001f, "Should return the updated float value"); + + // Test with zero + floatGauge.update(0.0f); + assertEquals(0.0f, floatGauge.getValue(), 0.001f, "Should return zero"); + + // Test with negative value + floatGauge.update(-1.5f); + assertEquals( + -1.5f, floatGauge.getValue(), 0.001f, "Should return the negative float value"); + + // Test with maximum value + floatGauge.update(Float.MAX_VALUE); + assertEquals( + Float.MAX_VALUE, floatGauge.getValue(), 0.001f, "Should return Float.MAX_VALUE"); + + // Test with minimum value + floatGauge.update(Float.MIN_VALUE); + assertEquals( + Float.MIN_VALUE, floatGauge.getValue(), 0.001f, "Should return Float.MIN_VALUE"); + } + + /** Test updating and getting long values. */ + @Test + void testUpdateAndGetLong() { + UpdatableGaugeImpl<Long> longGauge = new UpdatableGaugeImpl<>(); + + // Test with positive value + longGauge.update(10000000000L); + assertEquals(10000000000L, longGauge.getValue(), "Should return the updated long value"); + + // Test with zero + longGauge.update(0L); + assertEquals(0L, longGauge.getValue(), "Should return zero"); + + // Test with negative value + longGauge.update(-5000000000L); + assertEquals(-5000000000L, longGauge.getValue(), "Should return the negative long value"); + + // Test with maximum value + longGauge.update(Long.MAX_VALUE); + assertEquals(Long.MAX_VALUE, longGauge.getValue(), "Should return Long.MAX_VALUE"); + + // Test with minimum value + longGauge.update(Long.MIN_VALUE); + assertEquals(Long.MIN_VALUE, longGauge.getValue(), "Should return Long.MIN_VALUE"); + } + + /** Test initial state. */ + @Test + void testInitialState() { + UpdatableGaugeImpl<Integer> intGauge = new UpdatableGaugeImpl<>(); + assertNull(intGauge.getValue(), "Initial value should be null"); + } +}