davidradl commented on code in PR #27568:
URL: https://github.com/apache/flink/pull/27568#discussion_r2787880853
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonUtils.java:
##########
@@ -51,40 +66,148 @@ public class TritonUtils {
private static final Object LOCK = new Object();
- private static final Map<Long, ClientValue> cache = new HashMap<>();
+ private static final Map<ClientKey, ClientValue> cache = new HashMap<>();
+
+ /** Connection pool configuration holder. */
+ public static class ConnectionPoolConfig {
+ public final int maxIdleConnections;
+ public final long keepAliveDurationMs;
+ public final int maxTotalConnections;
+ public final long connectionTimeoutMs;
+ public final boolean reuseEnabled;
+ public final boolean monitoringEnabled;
+
+ public ConnectionPoolConfig(
+ int maxIdleConnections,
+ long keepAliveDurationMs,
+ int maxTotalConnections,
+ long connectionTimeoutMs,
+ boolean reuseEnabled,
+ boolean monitoringEnabled) {
+ this.maxIdleConnections = maxIdleConnections;
+ this.keepAliveDurationMs = keepAliveDurationMs;
+ this.maxTotalConnections = maxTotalConnections;
+ this.connectionTimeoutMs = connectionTimeoutMs;
+ this.reuseEnabled = reuseEnabled;
+ this.monitoringEnabled = monitoringEnabled;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConnectionPoolConfig that = (ConnectionPoolConfig) o;
+ return maxIdleConnections == that.maxIdleConnections
+ && keepAliveDurationMs == that.keepAliveDurationMs
+ && maxTotalConnections == that.maxTotalConnections
+ && connectionTimeoutMs == that.connectionTimeoutMs
+ && reuseEnabled == that.reuseEnabled
+ && monitoringEnabled == that.monitoringEnabled;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ maxIdleConnections,
+ keepAliveDurationMs,
+ maxTotalConnections,
+ connectionTimeoutMs,
+ reuseEnabled,
+ monitoringEnabled);
+ }
+ }
/**
* Creates or retrieves a cached HTTP client with the specified
configuration.
*
* <p>This method implements reference-counted client pooling. Clients
with identical timeout
- * settings are shared across multiple callers.
+ * and pool settings are shared across multiple callers.
*
- * @param timeoutMs Timeout in milliseconds for connect, read, and write
operations
+ * @param timeoutMs Timeout in milliseconds for read and write operations
+ * @param poolConfig Connection pool configuration
* @return A shared or new OkHttpClient instance
*/
- public static OkHttpClient createHttpClient(long timeoutMs) {
+ public static OkHttpClient createHttpClient(long timeoutMs,
ConnectionPoolConfig poolConfig) {
+ ClientKey key = new ClientKey(timeoutMs, poolConfig);
+
synchronized (LOCK) {
- ClientValue value = cache.get(timeoutMs);
+ ClientValue value = cache.get(key);
if (value != null) {
- LOG.debug("Returning an existing Triton HTTP client.");
+ LOG.debug("Returning existing Triton HTTP client (reference
count: {}).",
+ value.referenceCount.get() + 1);
value.referenceCount.incrementAndGet();
Review Comment:
why not but the DEBUG after the value.referenceCount.incrementAndGet(); so
you do not need to increment it in the debug string. Maybe
```
AtomicInteger newReferenceCount = value.referenceCount.incrementAndGet();
LOG.debug("Returning existing Triton HTTP client (reference count: {}).",
newReferenceCount );
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]