[ 
https://issues.apache.org/jira/browse/FLINK-8493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353583#comment-16353583
 ] 

ASF GitHub Bot commented on FLINK-8493:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5339#discussion_r166221453
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 ---
    @@ -205,32 +201,32 @@ private void executeActionAsync(
                        return cachedFuture;
                }
     
    -           LOG.debug("Retrieving location for state={} of job={} from the 
job manager.", jobId, queryableStateName);
    -
    -           final CompletableFuture<KvStateLocation> location = new 
CompletableFuture<>();
    -           lookupCache.put(cacheKey, location);
    -           return proxy.getJobManagerFuture().thenComposeAsync(
    -                           jobManagerGateway -> {
    -                                   final Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
    -                                   jobManagerGateway.ask(msg, 
FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
    -                                                   
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
    -                                                   .onComplete(new 
OnComplete<KvStateLocation>() {
    -
    -                                                           @Override
    -                                                           public void 
onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
    -                                                                   if 
(failure != null) {
    -                                                                           
if (failure instanceof FlinkJobNotFoundException) {
    -                                                                           
        // if the jobId was wrong, remove the entry from the cache.
    -                                                                           
        lookupCache.remove(cacheKey);
    -                                                                           
}
    -                                                                           
location.completeExceptionally(failure);
    -                                                                   } else {
    -                                                                           
location.complete(loc);
    -                                                                   }
    -                                                           }
    -                                                   }, 
Executors.directExecutionContext());
    -                                   return location;
    -                           }, queryExecutor);
    +           final KvStateLocationOracle kvStateLocationOracle = 
proxy.getKvStateLocationOracle(jobId);
    +
    +           if (kvStateLocationOracle != null) {
    +                   LOG.debug("Retrieving location for state={} of job={} 
from the key-value state location oracle.", jobId, queryableStateName);
    +                   final CompletableFuture<KvStateLocation> location = new 
CompletableFuture<>();
    +                   lookupCache.put(cacheKey, location);
    +
    +                   kvStateLocationOracle
    +                           .requestKvStateLocation(jobId, 
queryableStateName)
    +                           .whenComplete(
    +                                   (KvStateLocation kvStateLocation, 
Throwable throwable) -> {
    +                                           if (throwable != null) {
    +                                                   if 
(ExceptionUtils.stripCompletionException(throwable) instanceof 
FlinkJobNotFoundException) {
    +                                                           // if the jobId 
was wrong, remove the entry from the cache.
    +                                                           
lookupCache.remove(cacheKey);
    +                                                   }
    +                                                   
location.completeExceptionally(throwable);
    +                                           } else {
    +                                                   
location.complete(kvStateLocation);
    +                                           }
    +                                   });
    +
    +                   return location;
    +           } else {
    +                   return FutureUtils.completedExceptionally(new 
UnknownJobManagerException());
    --- End diff --
    
    I'll rename it into `UnknownLocationException`. Since this exception is 
marked as internal it should not be a problem.


> Integrate queryable state with Flip-6
> -------------------------------------
>
>                 Key: FLINK-8493
>                 URL: https://issues.apache.org/jira/browse/FLINK-8493
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination, Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> In order to make Flip-6 support queryable state, we have to start and 
> register the {{KvStateServer}} and {{KvStateProxyClient}} in the 
> {{TaskExecutor}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to