weiqingy commented on code in PR #860:
URL: https://github.com/apache/flink-agents/pull/860#discussion_r3502149902
##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java:
##########
@@ -200,6 +203,11 @@ public Object callMethod(Object obj, String methodName,
Map<String, Object> kwar
return interpreter.invoke(CALL_METHOD, obj, methodName, kwargs);
}
+ @Override
+ public void setMetricGroup(Object pythonResource, FlinkAgentsMetricGroup
metricGroup) {
+ interpreter.invoke(SET_METRIC_GROUP, pythonResource, metricGroup);
Review Comment:
This now forwards the action metric group through to the Python provider,
which is exactly what issue #857 asked for — and #857 also called out "define
metric ownership clearly to avoid double-counting when action-level and
provider-level metrics both observe the same response." From what I can see,
ownership already looks singular: token metrics are recorded in exactly one
place per language — the action handler
(`ChatModelAction.recordChatTokenMetrics` → `chatModel.recordTokenMetrics` on
the Java side, `chat_model_action` → `_record_token_metrics` on the Python
side), each reading the resource's bound group. The provider integrations under
`integrations/` record nothing, so there's no second recorder to collide with —
this PR widens that single existing recording across the language boundary
rather than introducing a second one. So the #857 concern reads as already
closed by construction. Would it be worth a sentence in the PR description
making that explicit, so the "
no double-counting" reasoning is traceable back to the issue?
##########
api/src/main/java/org/apache/flink/agents/api/resource/Resource.java:
##########
@@ -60,6 +61,9 @@ public ResourceContext getResourceContext() {
*/
public void setMetricGroup(FlinkAgentsMetricGroup metricGroup) {
this.metricGroup = metricGroup;
+ if (this instanceof PythonResourceWrapper) {
+ ((PythonResourceWrapper)
this).setPythonResourceMetricGroup(metricGroup);
Review Comment:
The abstract base `Resource` (package `...api.resource`) now imports and
`instanceof`-checks `PythonResourceWrapper` from the more specialized
`...api.resource.python` sub-package. That inverts the usual dependency
direction — every Java-only resource (chat models, tools, prompts, vector
stores) now carries a compile-time edge to the Python-bridge interface in its
base type, and a hypothetical third forwarding flavor would mean editing this
base method again rather than overriding it.
One option that keeps the base oblivious to the bridge: have each `Python*`
wrapper override `setMetricGroup` to call `super.setMetricGroup(...)` and then
`setPythonResourceMetricGroup(...)`, pushing the Python concern down into the
wrappers where `getPythonResourceAdapter()` already lives. The PR already adds
a new `getPythonResourceAdapter()` override to all eight wrappers, so a
per-wrapper `setMetricGroup` override would touch the same set of files — just
in the Python layer instead of the base class. Was the centralized `instanceof`
chosen deliberately, or mainly to avoid touching the wrappers?
##########
python/flink_agents/runtime/tests/test_cross_language_metric_group.py:
##########
@@ -0,0 +1,86 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any
+
+import pytest
+
+from flink_agents.runtime.java.java_chat_model import (
+ JavaChatModelConnectionImpl,
+ JavaChatModelSetupImpl,
+)
+from flink_agents.runtime.java.java_embedding_model import (
+ JavaEmbeddingModelConnectionImpl,
+ JavaEmbeddingModelSetupImpl,
+)
+from flink_agents.runtime.java.java_resource_wrapper import (
+ set_java_resource_metric_group,
+)
+
+
+class _JavaResource:
+ def __init__(self) -> None:
+ self.metric_group: Any = None
+
+ def setMetricGroup(self, metric_group: Any) -> None:
+ self.metric_group = metric_group
+
+
+class _MetricGroup:
+ def __init__(self) -> None:
+ self._j_metric_group = object()
+
+
[email protected](
+ "resource",
+ [
+ JavaChatModelConnectionImpl(
+ j_resource=_JavaResource(), j_resource_adapter=None
+ ),
+ JavaChatModelSetupImpl(
+ j_resource=_JavaResource(),
+ j_resource_adapter=None,
+ connection="connection",
+ model="model",
+ ),
+ JavaEmbeddingModelConnectionImpl(
+ j_resource=_JavaResource(), j_resource_adapter=None
+ ),
+ JavaEmbeddingModelSetupImpl(
+ j_resource=_JavaResource(),
+ j_resource_adapter=None,
+ connection="connection",
+ model="model",
+ ),
+ ],
+)
+def test_java_resource_wrappers_forward_metric_group(resource):
Review Comment:
This new file covers the Python→Java direction well, but only that direction
— `set_java_resource_metric_group` plus the four `Java*Impl` wrappers. The
Java→Python direction's Python half, `python_java_utils.set_metric_group`
(`python_java_utils.py:382`), has no direct test: it's the function that wraps
the raw Java group in `FlinkMetricGroup` and owns the only `None` branch in the
feature, and the Java half is mock-verify only (`testSetMetricGroup` asserts
the invoke string, it doesn't execute Python). So that seam isn't exercised
end-to-end today.
Would a small unit test in this file's style be worth adding — a fake
resource that captures its `set_metric_group` arg, then `set_metric_group(fake,
sentinel_j_group)` asserting the captured arg is a `FlinkMetricGroup` whose
`_j_metric_group is sentinel_j_group`, plus a `None` case asserting `None` is
forwarded? That would pin the wrap-and-None behavior that's currently uncovered.
##########
python/flink_agents/runtime/java/java_resource_wrapper.py:
##########
@@ -27,6 +27,14 @@
from flink_agents.api.tools.tool import Tool, ToolMetadata, ToolType
+def set_java_resource_metric_group(j_resource: Any, metric_group: Any) -> None:
+ """Bind the underlying Java metric group to a wrapped Java resource."""
+ if j_resource is None:
+ return
+ j_metric_group = getattr(metric_group, "_j_metric_group", metric_group)
Review Comment:
In the real flow this is correct — `metric_group` is always a
`FlinkMetricGroup` (it comes from `action_metric_group`), so the unwrap hits
`_j_metric_group` and forwards the genuine Java group. Low confidence that the
edge below matters in practice, so genuinely a question rather than a flag.
`RunnerContext.get_resource(name, type, metric_group=...)` is a documented
public API accepting any `MetricGroup` — an abstract base a user could subclass
without a `_j_metric_group`. If such a custom group reaches a Java resource
wrapper, the silent `, metric_group` fallback would forward a raw Python object
into Java `setMetricGroup(FlinkAgentsMetricGroup)`, failing opaquely at the
pemja boundary rather than with a clear error. Worth making the fallback
explicit (unwrap only a real `FlinkMetricGroup`, else fail loudly), or is that
out of scope today given no in-tree caller hits it? (The `None` case is benign
— `get_resource` substitutes `action_metric_group` first, and a direct `None`
just clears the field — so no concern there.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]