weiqingy commented on code in PR #852:
URL: https://github.com/apache/flink-agents/pull/852#discussion_r3424600676


##########
api/src/test/java/org/apache/flink/agents/api/chat/model/routing/RoutingTestSupport.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.chat.model.routing;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Shared fakes for the routing tests. */
+final class RoutingTestSupport {
+
+    private RoutingTestSupport() {}
+
+    static ResourceDescriptor emptyDescriptor(Class<?> clazz) {
+        return new ResourceDescriptor(clazz.getName(), Collections.emptyMap());
+    }
+
+    /** A chat model that records the messages it received and answers with 
its own tag. */
+    static final class RecordingModel extends BaseChatModelSetup {
+        final String tag;
+        int callCount = 0;
+        List<ChatMessage> lastMessages;
+
+        RecordingModel(String tag) {
+            super(emptyDescriptor(RecordingModel.class), null);
+            this.tag = tag;
+        }
+
+        @Override
+        public Map<String, Object> getParameters() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public ChatMessage chat(
+                List<ChatMessage> messages,
+                Map<String, Object> promptArgs,
+                Map<String, Object> modelParams) {
+            this.callCount++;
+            this.lastMessages = new ArrayList<>(messages);
+            return new ChatMessage(MessageRole.ASSISTANT, "handled-by:" + tag);
+        }
+    }
+
+    /** A chat model that always fails — used to exercise fallback behavior. */
+    static final class FailingModel extends BaseChatModelSetup {
+        int callCount = 0;
+
+        FailingModel() {
+            super(emptyDescriptor(FailingModel.class), null);
+        }
+
+        @Override
+        public Map<String, Object> getParameters() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public ChatMessage chat(
+                List<ChatMessage> messages,
+                Map<String, Object> promptArgs,
+                Map<String, Object> modelParams) {
+            this.callCount++;
+            throw new RuntimeException("boom from failing model");
+        }
+    }
+
+    /** A chat model that returns a scripted reply — used as an LLM-as-router 
judge. */
+    static final class ScriptedJudge extends BaseChatModelSetup {
+        final String reply;
+        int callCount = 0;
+        List<ChatMessage> lastMessages;
+
+        ScriptedJudge(String reply) {
+            super(emptyDescriptor(ScriptedJudge.class), null);
+            this.reply = reply;
+        }
+
+        @Override
+        public Map<String, Object> getParameters() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public ChatMessage chat(
+                List<ChatMessage> messages,
+                Map<String, Object> promptArgs,
+                Map<String, Object> modelParams) {
+            this.callCount++;
+            this.lastMessages = new ArrayList<>(messages);
+            return new ChatMessage(MessageRole.ASSISTANT, reply);
+        }
+    }
+
+    /** A {@link ResourceContext} backed by a fixed name → resource map. */
+    static ResourceContext context(Map<String, Resource> byName) {
+        return ResourceContext.fromGetResource(

Review Comment:
   The fakes resolve through `ResourceContext.fromGetResource(...)`, which 
returns the registered resource directly — unlike the runtime 
`ResourceCache.getResource()`, which calls `open()` on the resolved resource 
before handing it back (`ResourceCache.java:125`). The upshot is that the 
load-bearing production invariant — a routed candidate gets `open()`-ed, so its 
`connection` is non-null before `chat()` — isn't exercised by any unit test; it 
rests on the `ResourceCache` path plus the e2e example. That's a structural 
property of resolving fakes without a real cache, not a defect in this PR. Does 
the e2e `LlmRoutingAgentExample` exercise the real-candidate 
(`open()`-then-`chat()`) path in CI, so the invariant is covered somewhere? If 
it doesn't, would a single router test backed by a real `ResourceCache` and a 
candidate that actually needs `open()` be worth adding, to pin the one fact the 
whole drop-in design depends on?



##########
api/src/main/java/org/apache/flink/agents/api/chat/model/routing/RoutingCandidate.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.chat.model.routing;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A routable target of a {@link ChatModelRouter}.
+ *
+ * <p>A candidate names a chat-model setup that was registered as a {@link
+ * org.apache.flink.agents.api.resource.ResourceType#CHAT_MODEL} resource, 
together with an optional
+ * human-readable {@code description} (consumed by LLM-as-router strategies to 
reason about which
+ * model fits a request) and free-form {@code metadata} (e.g. {@code cost}, 
{@code tags},
+ * capabilities) that rule-based or custom strategies can match against.
+ */
+public class RoutingCandidate {
+
+    private final String name;
+    private final String description;
+    private final Map<String, Object> metadata;
+
+    public RoutingCandidate(String name, String description, Map<String, 
Object> metadata) {
+        this.name = Objects.requireNonNull(name, "candidate name must not be 
null");

Review Comment:
   The ctor null-checks `name` but lets an empty string through, and in 
`LlmRoutingStrategy.parseChoice` the whole-token regex for an empty quoted name 
over-matches almost any boundary → mis-route. It takes a pathological config 
(an empty candidate name) to hit, so low priority — a one-line `name.isEmpty()` 
guard alongside the `requireNonNull` closes it cleanly.



##########
api/src/main/java/org/apache/flink/agents/api/chat/model/routing/CachingStrategy.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.chat.model.routing;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A {@link RoutingStrategy} decorator that memoizes the wrapped strategy's 
decision per
+ * conversation (keyed on {@link RoutingContext#firstUserMessage()}), so an 
expensive selection —
+ * e.g. an LLM judge — runs once per conversation rather than on every 
tool-call round.
+ *
+ * <p>The cache is a <b>bounded LRU</b> with real eviction (oldest entries are 
dropped past the
+ * capacity), so it never grows without bound and never silently stops 
caching. Empty keys (requests
+ * with no user message) are not cached, to avoid coupling unrelated 
empty-prompt conversations. A
+ * {@code null} decision (the strategy abstaining — e.g. a transient LLM-judge 
failure) is likewise
+ * not cached, so the strategy is re-consulted on the next round rather than 
pinned to a fallback.
+ * Thread-safe for the async execution pool.
+ */
+public final class CachingStrategy implements RoutingStrategy {
+
+    /** Default cache capacity if none is specified. */
+    public static final int DEFAULT_MAX_ENTRIES = 1024;
+
+    private final RoutingStrategy delegate;
+    private final Map<String, String> cache;
+
+    public CachingStrategy(RoutingStrategy delegate) {
+        this(delegate, DEFAULT_MAX_ENTRIES);
+    }
+
+    public CachingStrategy(RoutingStrategy delegate, int maxEntries) {
+        if (delegate == null) {
+            throw new IllegalArgumentException("delegate strategy must not be 
null");
+        }
+        if (maxEntries <= 0) {
+            throw new IllegalArgumentException("maxEntries must be positive: " 
+ maxEntries);
+        }
+        this.delegate = delegate;
+        this.cache = Collections.synchronizedMap(new LruMap(maxEntries));
+    }
+
+    @Override
+    public String route(RoutingContext context) throws Exception {
+        String key = context.firstUserMessage();
+        if (key.isEmpty()) {
+            // Don't cache empty keys: every empty-prompt conversation would 
otherwise share one
+            // decision. Recompute each time instead.
+            return delegate.route(context);
+        }
+        String cached = cache.get(key);

Review Comment:
   The check-then-act here is non-atomic: `cache.get(key)` … 
`delegate.route(context)` … `cache.put(key, chosen)`. Two async-pool threads 
handling the same conversation key on its first touch can both miss the cache 
and both invoke `delegate.route()` — i.e. two judge-model calls for the same 
conversation. The map itself is `synchronizedMap`, so there's no corruption, 
and the redundant compute is benign (last-writer-wins on the same key), so I 
wouldn't add locking for it — the cost isn't worth it. My only question is 
about the wording: the class Javadoc here and the `LlmRoutingStrategy` Javadoc 
("the judge runs once per conversation") read as a hard guarantee, where this 
is really best-effort memoization. Would it be worth softening to "typically 
once per conversation" (or a one-line note that a concurrent first-touch can 
double-compute), so the contract matches the behavior? Or is the stronger 
phrasing intentional?



##########
python/flink_agents/api/resource.py:
##########
@@ -356,5 +356,17 @@ class Java:
             # Milvus
             MILVUS_VECTOR_STORE = 
"org.apache.flink.agents.integrations.vectorstores.milvus.MilvusVectorStore"
 
+    class RoutingStrategy:

Review Comment:
   This is the only Python surface the feature gets — the mirrored 
`RoutingStrategy.Java.{RULE_BASED, LLM}` FQCN constants, with no Python 
`ChatModelRouter`, no Python `RoutingStrategy` SPI, and no Python tests. 
CLAUDE.md asks for Java/Python parity, though that's scoped to "when changing 
shared logic," and this PR adds a new Java resource resolved by name rather 
than touching shared logic. The mirrored constants read as a deliberate bridge 
— a Python agent points a `CHAT_MODEL` at the Java router by name. Is that the 
intended Python story for v1 ("reference the Java router by name," with a 
native Python SPI as a possible follow-up), or is Python routing out of scope 
for now? Asking mainly so the intent is on the record — the constants imply 
Python users are expected to reach this, so it'd help to state how far that's 
meant to go.



##########
api/src/main/java/org/apache/flink/agents/api/chat/model/routing/ChatModelRouter.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.chat.model.routing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A pluggable LLM router that selects, per request, which underlying chat 
model should serve it.
+ *
+ * <p>The router is a drop-in {@link BaseChatModelSetup}: it reports {@link 
ResourceType#CHAT_MODEL}
+ * and is resolved by the built-in chat action exactly like any other model, 
so an agent points at
+ * the router by name and nothing else in the framework needs to change. 
Concerns are layered:
+ * selection ({@link RoutingStrategy}) decides the model, optional {@link 
CachingStrategy} memoizes
+ * that decision per conversation, and {@link FallbackPolicy} decides what to 
try if the chosen
+ * model errors. The router then delegates to the chosen model's own {@code 
chat(...)}, preserving
+ * its prompt, tools, parameters, and token metrics.
+ *
+ * <h2>Configuration ({@link ResourceDescriptor} arguments)</h2>
+ *
+ * <ul>
+ *   <li>{@code candidates} (required) — routable models; each a {@link 
RoutingCandidate}, a {@link
+ *       String} name, or a {@link Map} with {@code name}/{@code 
description}/{@code metadata}.
+ *       Every name must reference a registered {@code CHAT_MODEL} resource.
+ *   <li>{@code strategy} (required) — a {@link ResourceDescriptor} naming the 
{@link
+ *       RoutingStrategy} implementation and its args (use {@link
+ *       org.apache.flink.agents.api.resource.ResourceName.RoutingStrategy} 
for built-ins). The
+ *       class needs a public {@code (ResourceDescriptor, ResourceContext)} 
constructor (see {@link
+ *       AbstractRoutingStrategy}); it is loaded with the thread context 
classloader so user classes
+ *       resolve on a cluster.
+ *   <li>{@code fallback} (optional, default {@code false}) — when {@code 
true}, a failing model
+ *       falls back to the remaining candidates in declaration order.
+ *   <li>{@code cache} (optional, default {@code true}) — memoize the 
selection per conversation.
+ *   <li>{@code cache_size} (optional, default {@link 
CachingStrategy#DEFAULT_MAX_ENTRIES}) — LRU
+ *       capacity when caching is enabled.
+ *   <li>{@code default} (optional) — the candidate used when the strategy 
abstains or names a
+ *       non-candidate (a routing miss). Must be one of {@code candidates}; 
defaults to the first
+ *       candidate.
+ * </ul>
+ *
+ * <p><b>Graceful degrade:</b> if the strategy returns {@code null} ("no 
opinion", e.g. a transient
+ * LLM-judge failure) or a name that is not a configured candidate, the router 
treats it as a
+ * routing miss and serves the request from {@code default} (then the fallback 
order) rather than
+ * failing.
+ *
+ * <p><b>Bash/skill tool args (v1 scope):</b> the built-in chat action injects 
bash allowlists and
+ * skill directories from the resource resolved by the agent's model name — 
i.e. this router — not
+ * the chosen backend. So configure {@code allowed_commands} / {@code 
allowed_script_dirs} / {@code
+ * skills} <b>on the router</b>; per-candidate skills/allowlists are not 
supported in v1.
+ *
+ * <p><b>Metrics note (v1):</b> retry metrics recorded by the built-in chat 
action are grouped under
+ * this router's connection label ({@code "router"}), not the backend model 
actually used.
+ * Per-backend attribution is a documented follow-up.
+ */
+public class ChatModelRouter extends BaseChatModelSetup {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChatModelRouter.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public static final String ARG_CANDIDATES = "candidates";
+    public static final String ARG_STRATEGY = "strategy";
+    public static final String ARG_FALLBACK = "fallback";
+    public static final String ARG_CACHE = "cache";
+    public static final String ARG_CACHE_SIZE = "cache_size";
+    public static final String ARG_DEFAULT = "default";
+
+    private final List<RoutingCandidate> candidates;
+    private final Set<String> candidateNames;
+    private final RoutingStrategy strategy;
+    private final FallbackPolicy fallbackPolicy;
+    private final boolean fallbackEnabled;
+    private final String defaultCandidate;
+
+    @SuppressWarnings("unchecked")
+    public ChatModelRouter(ResourceDescriptor descriptor, ResourceContext 
resourceContext) {
+        super(descriptor, resourceContext);
+
+        List<Object> rawCandidates = descriptor.getArgument(ARG_CANDIDATES);
+        if (rawCandidates == null || rawCandidates.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "ChatModelRouter requires a non-empty 'candidates' 
argument.");
+        }
+        List<RoutingCandidate> parsed = new ArrayList<>(rawCandidates.size());
+        Set<String> names = new LinkedHashSet<>();
+        for (Object spec : rawCandidates) {
+            RoutingCandidate candidate = RoutingCandidate.from(spec);
+            parsed.add(candidate);
+            names.add(candidate.getName());
+        }
+        this.candidates = Collections.unmodifiableList(parsed);
+        this.candidateNames = Collections.unmodifiableSet(names);
+
+        ResourceDescriptor strategyDescriptor =
+                toResourceDescriptor(descriptor.getArgument(ARG_STRATEGY));
+        RoutingStrategy base = instantiateStrategy(strategyDescriptor, 
resourceContext);
+
+        boolean cache = descriptor.getArgument(ARG_CACHE, Boolean.TRUE);
+        if (cache) {
+            int cacheSize =
+                    descriptor.getArgument(ARG_CACHE_SIZE, 
CachingStrategy.DEFAULT_MAX_ENTRIES);
+            this.strategy = new CachingStrategy(base, cacheSize);
+        } else {
+            this.strategy = base;
+        }
+
+        this.fallbackEnabled = descriptor.getArgument(ARG_FALLBACK, 
Boolean.FALSE);
+        this.fallbackPolicy =
+                fallbackEnabled ? FallbackPolicy.remainingCandidates() : 
FallbackPolicy.none();
+
+        // Default candidate used on a routing miss (strategy abstains / names 
a non-candidate).
+        // Validated at construction so a typo is a clear config error, not a 
per-request failure.
+        String configuredDefault = descriptor.getArgument(ARG_DEFAULT);
+        if (configuredDefault != null && 
!candidateNames.contains(configuredDefault)) {
+            throw new IllegalArgumentException(
+                    "ChatModelRouter 'default' '"
+                            + configuredDefault
+                            + "' is not one of the configured candidates "
+                            + candidateNames);
+        }
+        this.defaultCandidate =
+                configuredDefault != null ? configuredDefault : 
candidates.get(0).getName();
+    }
+
+    /**
+     * The router has no connection of its own to resolve (it delegates to 
candidate models, each of
+     * which resolves its own). Override to skip the base class's connection 
resolution.
+     */
+    @Override
+    public void open() {

Review Comment:
   Worth calling out the design here: the three-way split into selection / 
resilience / caching (`RoutingStrategy` / `FallbackPolicy` / `CachingStrategy`) 
lands the right seams, and the drop-in `CHAT_MODEL` story holds up against the 
real `ChatModelAction` + `ResourceCache` path. One non-obvious correctness 
point stood out at this `open()`: the no-op is safe precisely *because* a 
routed candidate is lazily `open()`-ed by `ResourceCache.getResource()` when 
it's resolved, so its `connection` is non-null before `chat()` runs. 
Documenting that invariant in the Javadoc rather than leaving it implicit is 
exactly right — it's the kind of thing that bites a future maintainer if it 
only lives in someone's head.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to