Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5339#discussion_r165939136
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
---
@@ -205,32 +201,32 @@ private void executeActionAsync(
return cachedFuture;
}
- LOG.debug("Retrieving location for state={} of job={} from the
job manager.", jobId, queryableStateName);
-
- final CompletableFuture<KvStateLocation> location = new
CompletableFuture<>();
- lookupCache.put(cacheKey, location);
- return proxy.getJobManagerFuture().thenComposeAsync(
- jobManagerGateway -> {
- final Object msg = new
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
- jobManagerGateway.ask(msg,
FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
- .onComplete(new
OnComplete<KvStateLocation>() {
-
- @Override
- public void
onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
- if
(failure != null) {
-
if (failure instanceof FlinkJobNotFoundException) {
-
// if the jobId was wrong, remove the entry from the cache.
-
lookupCache.remove(cacheKey);
-
}
-
location.completeExceptionally(failure);
- } else {
-
location.complete(loc);
- }
- }
- },
Executors.directExecutionContext());
- return location;
- }, queryExecutor);
+ final KvStateLocationOracle kvStateLocationOracle =
proxy.getKvStateLocationOracle(jobId);
+
+ if (kvStateLocationOracle != null) {
+ LOG.debug("Retrieving location for state={} of job={}
from the key-value state location oracle.", jobId, queryableStateName);
+ final CompletableFuture<KvStateLocation> location = new
CompletableFuture<>();
+ lookupCache.put(cacheKey, location);
+
+ kvStateLocationOracle
+ .requestKvStateLocation(jobId,
queryableStateName)
+ .whenComplete(
+ (KvStateLocation kvStateLocation,
Throwable throwable) -> {
+ if (throwable != null) {
+ if
(ExceptionUtils.stripCompletionException(throwable) instanceof
FlinkJobNotFoundException) {
+ // if the jobId
was wrong, remove the entry from the cache.
+
lookupCache.remove(cacheKey);
+ }
+
location.completeExceptionally(throwable);
+ } else {
+
location.complete(kvStateLocation);
+ }
+ });
+
+ return location;
+ } else {
+ return FutureUtils.completedExceptionally(new
UnknownJobManagerException());
--- End diff --
I would be up for changing the name of this exception, as now it is an
`UnknownJobManagerException`. The role of the job manager now is played by the
oracle. I understand that this may change user-facing behavior, and that
`UnknowOracleException` is a bit cryptic, but I suppose we can find something
more adequate. And even if we leave it like this for now, we should open a JIRA
for the future.
---