Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5339#discussion_r165939136
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
    @@ -205,32 +201,32 @@ private void executeActionAsync(
                        return cachedFuture;
                }
     
    -           LOG.debug("Retrieving location for state={} of job={} from the 
job manager.", jobId, queryableStateName);
    -
    -           final CompletableFuture<KvStateLocation> location = new 
CompletableFuture<>();
    -           lookupCache.put(cacheKey, location);
    -           return proxy.getJobManagerFuture().thenComposeAsync(
    -                           jobManagerGateway -> {
    -                                   final Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
    -                                   jobManagerGateway.ask(msg, 
FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
    -                                                   
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
    -                                                   .onComplete(new 
OnComplete<KvStateLocation>() {
    -
    -                                                           @Override
    -                                                           public void 
onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
    -                                                                   if 
(failure != null) {
    -                                                                           
if (failure instanceof FlinkJobNotFoundException) {
    -                                                                           
        // if the jobId was wrong, remove the entry from the cache.
    -                                                                           
        lookupCache.remove(cacheKey);
    -                                                                           
}
    -                                                                           
location.completeExceptionally(failure);
    -                                                                   } else {
    -                                                                           
location.complete(loc);
    -                                                                   }
    -                                                           }
    -                                                   }, 
Executors.directExecutionContext());
    -                                   return location;
    -                           }, queryExecutor);
    +           final KvStateLocationOracle kvStateLocationOracle = 
proxy.getKvStateLocationOracle(jobId);
    +
    +           if (kvStateLocationOracle != null) {
    +                   LOG.debug("Retrieving location for state={} of job={} 
from the key-value state location oracle.", jobId, queryableStateName);
    +                   final CompletableFuture<KvStateLocation> location = new 
CompletableFuture<>();
    +                   lookupCache.put(cacheKey, location);
    +
    +                   kvStateLocationOracle
    +                           .requestKvStateLocation(jobId, 
queryableStateName)
    +                           .whenComplete(
    +                                   (KvStateLocation kvStateLocation, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   if 
(ExceptionUtils.stripCompletionException(throwable) instanceof 
FlinkJobNotFoundException) {
    +                                                           // if the jobId 
was wrong, remove the entry from the cache.
    +                                                           
lookupCache.remove(cacheKey);
    +                                                   }
    +                                                   
location.completeExceptionally(throwable);
    +                                           } else {
    +                                                   
location.complete(kvStateLocation);
    +                                           }
    +                                   });
    +
    +                   return location;
    +           } else {
    +                   return FutureUtils.completedExceptionally(new 
UnknownJobManagerException());
    --- End diff --
    
    I would be up for changing the name of this exception, as now it is an 
`UnknownJobManagerException`. The role of the job manager now is played by the 
oracle. I understand that this may change user-facing behavior, and that 
`UnknowOracleException` is a bit cryptic, but I suppose we can find something 
more adequate. And even if we leave it like this for now, we should open a JIRA 
for the future.


---

Reply via email to