codenohup commented on code in PR #74: URL: https://github.com/apache/flink-agents/pull/74#discussion_r2224148535
########## python/flink_agents/runtime/local_runner.py: ########## @@ -96,6 +97,18 @@ def send_event(self, event: Event) -> None: def get_resource(self, name: str, type: ResourceType) -> Resource: return self.__agent_plan.get_resource(name, type) + @override + def get_agent_metric_group(self) -> MetricGroup: + # TODO: Support metric mechanism for local agent execution. Review Comment: Using metrics with LocalRunner throws error is strange, which implies that an agent needs to have two different code versions — one for use with _LocalRunner_ and another for _FlinkRunner_. ########## runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java: ########## @@ -91,6 +98,12 @@ public void open() throws Exception { this.runnerContext = new RunnerContextImpl(); + metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup()); + + builtInMetricGroup = new BuiltInMetricGroup(metricGroup, agentPlan); + + runnerContext.setAgentMetricGroup(metricGroup); Review Comment: It looks like RunnerContext and MetricGroup are created at the same time. Why isn't MetricGroup assigned to RunnerContext through its constructor? ########## runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** Tests for {@link FlinkAgentsMetricGroupImpl}. */ +public class FlinkAgentsMetricGroupImplTest { Review Comment: Typically, we do not write unit tests for an implementation class directly. ########## python/flink_agents/runtime/flink_metric_group.py: ########## @@ -0,0 +1,160 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import Any + +from typing_extensions import override + +from flink_agents.api.metric_group import Counter, Gauge, Histogram, Meter, MetricGroup + + +class FlinkMetricGroup(MetricGroup): + """Implementation of MetricGroup for flink execution environment.""" + def __init__(self, j_metric_group: Any) -> None: + """Initialize a flink runner context with the given java runner context. + + Parameters + ---------- + j_runner_context : Any + Java runner context used to synchronize data between Python and Java. + """ + self._j_metric_group = j_metric_group + + @override + def get_sub_group(self, name: str) -> "FlinkMetricGroup": + return FlinkMetricGroup(self._j_metric_group.getSubGroup(name)) + + @override + def get_counter(self, name: str) -> "FlinkCounter": + return FlinkCounter(self._j_metric_group.getCounter(name)) + + @override + def get_meter(self, name: str) -> "FlinkMeter": + return FlinkMeter(self._j_metric_group.getMeter(name)) + + @override + def get_histogram(self, name: str, window_size: int =100) -> "FlinkHistogram": + return FlinkHistogram(self._j_metric_group.getHistogram(name, window_size)) + + @override + def get_gauge(self, name: str) -> "FlinkGauge": + return FlinkGauge(self._j_metric_group.getGauge(name)) + +class FlinkCounter(Counter): + """Implementation of Counter for flink execution environment.""" + def __init__(self, j_counter: Any) -> None: + """Initialize a flink runner context with the given java runner context. + + Parameters + ---------- + j_counter : Any + Java counter used for measuring the count of events. + """ + self._j_counter = j_counter + + @override + def inc(self, n: int = 1) -> None: + """Increment the current count by the given value.""" + self._j_counter.inc(n) + + @override + def dec(self, n: int = 1) -> None: + """Decrement the current count by the given value.""" + self._j_counter.dec(n) + + @override + def get_count(self) -> int: + """Return the current count.""" + return self._j_counter.getCount() + +class FlinkMeter(Meter): + """Implementation of Meter for flink execution environment.""" + def __init__(self, j_meter: Any) -> None: + """Initialize a flink meter with the given java meter. + + Parameters + ---------- + j_meter : Any + Java meter measures throughput. + """ + self._j_meter = j_meter + + @override + def mark(self, n: int = 1) -> None: + """Mark the occurrence of n events.""" + self._j_meter.markEvent(n) + + @override + def get_rate(self) -> float: + """Return the current event rate per second.""" + return self._j_meter.getRate() + +class FlinkHistogram(Histogram): + """Implementation of Histogram for flink execution environment.""" + def __init__(self, j_histogram: Any) -> None: + """Initialize a flink histogram with the given java histogram. + + Parameters + ---------- + j_histogram : Any + Java histogram used for recording values and computing statistical + summaries. + """ + self._j_histogram = j_histogram + self._j_statistics = j_histogram.getStatistics() + + @override + def update(self, value: int) -> None: + """Record a new value into the histogram.""" + self._j_histogram.update(value) + + @override + def get_mean(self) -> float: + """Return the average value.""" + return self._j_statistics.getMean() + + @override + def get_max(self) -> int: + """Return the maximum recorded value.""" + return self._j_statistics.getMax() + + @override + def get_min(self) -> int: + """Return the minimum recorded value.""" + return self._j_statistics.getMin() + +class FlinkGauge(Gauge): + """Implementation of Gauge for flink execution environment.""" + def __init__(self, j_gauge: Any) -> None: + """Initialize a flink gauge with the given java gauge. + + Parameters + ---------- + j_gauge : Any + Java gauge for recording a string value. + """ + self._j_gauge = j_gauge + + @override + def update(self, value: str) -> None: Review Comment: Can the parameter type be Any, and then converted to a string internally? This would allow users to directly call something like _flink_gauge.update(0.05f)_. ########## api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.api.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; + +/** + * Abstract base class providing metric group for action execution. This metric group offers access + * to various types of metrics. + */ +public interface FlinkAgentsMetricGroup { + + /** + * Create or retrieve a sub-metric group with the given name. + * + * @param name The name of the sub metric group. + * @return the sub-metric group instance. + */ + FlinkAgentsMetricGroup getSubGroup(String name); + + /** + * Create or retrieve a gauge with the given name. + * + * @param name The name of the gauge. + * @return the gauge instance for string values. + */ + StringGauge getGauge(String name); Review Comment: It would be helpful to provide an explanation for why only StringGauge is currently supported. ########## runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java: ########## @@ -71,9 +77,11 @@ public void open() throws Exception { interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, pythonRunnerContextObject); } - public List<Event> executePythonFunction(PythonFunction function, PythonEvent event) + public List<Event> executePythonFunction( + PythonFunction function, PythonEvent event, ActionMetricGroup actionMetricGroup) throws Exception { runnerContext.checkNoPendingEvents(); + runnerContext.setActionMetricGroup(actionMetricGroup); Review Comment: Repeated with line 64 ########## runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java: ########## @@ -108,6 +121,7 @@ public void processElement(StreamRecord<IN> record) throws Exception { events.push(inputEvent); while (!events.isEmpty()) { Event event = events.pop(); + builtInMetricGroup.markEventOccurred(); Review Comment: Should we distinguish between the event generation time and the processing time? ########## runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.agents.runtime.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** Tests for {@link FlinkAgentsMetricGroupImpl}. */ +public class FlinkAgentsMetricGroupImplTest { + + private FlinkAgentsMetricGroupImpl metricGroup; + + @BeforeEach + public void setUp() { + MetricGroup parentMetricGroup = + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + metricGroup = new FlinkAgentsMetricGroupImpl(parentMetricGroup); + } + + @Test + public void testGetSubGroup() { Review Comment: In JUnit 5, it seems that test classes and test methods do not need to be public. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org