diggle1 commented on code in PR #99:
URL: 
https://github.com/apache/doris-kafka-connector/pull/99#discussion_r3232079524


##########
src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java:
##########
@@ -43,7 +54,51 @@ public static BackendUtils getInstance(DorisOptions 
dorisOptions, Logger logger)
         return new BackendUtils(RestService.getBackendsV2(dorisOptions, 
logger));
     }
 
+    /**
+     * Pick a usable backend. The previously chosen backend is reused while it 
is still within the
+     * cache TTL, so the hot write path does not pay for an HTTP probe on 
every call. When the cache
+     * is empty/expired we fall back to the round-robin probe behaviour.
+     */
     public String getAvailableBackend() {
+        String cached = cachedBackend;
+        if (cached != null && !isCacheExpired()) {
+            return cached;
+        }
+
+        synchronized (lock) {

Review Comment:
   You're right. Since this only runs in a single thread, the lock is 
unnecessary. I'll remove it.



##########
src/main/java/org/apache/doris/kafka/connector/utils/BackendUtils.java:
##########
@@ -43,7 +54,51 @@ public static BackendUtils getInstance(DorisOptions 
dorisOptions, Logger logger)
         return new BackendUtils(RestService.getBackendsV2(dorisOptions, 
logger));
     }
 
+    /**
+     * Pick a usable backend. The previously chosen backend is reused while it 
is still within the
+     * cache TTL, so the hot write path does not pay for an HTTP probe on 
every call. When the cache
+     * is empty/expired we fall back to the round-robin probe behaviour.
+     */
     public String getAvailableBackend() {
+        String cached = cachedBackend;
+        if (cached != null && !isCacheExpired()) {
+            return cached;
+        }
+
+        synchronized (lock) {
+            cached = cachedBackend;
+            if (cached != null && !isCacheExpired()) {
+                return cached;
+            }
+
+            String picked = pickBackendLocked();
+            cachedBackend = picked;
+            cachedAtNanos = System.nanoTime();
+            return picked;
+        }
+    }
+
+    /**
+     * Invalidate the cached backend. Callers should invoke this after a 
stream load / commit
+     * failure so that the next {@link #getAvailableBackend()} probes a fresh 
node instead of
+     * returning the failing one again.
+     */
+    public void invalidateCache() {
+        synchronized (lock) {

Review Comment:
   You're right. Since this only runs in a single thread, the lock is 
unnecessary. I'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to