[
https://issues.apache.org/jira/browse/FLINK-8493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353583#comment-16353583
]
ASF GitHub Bot commented on FLINK-8493:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5339#discussion_r166221453
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
---
@@ -205,32 +201,32 @@ private void executeActionAsync(
return cachedFuture;
}
- LOG.debug("Retrieving location for state={} of job={} from the
job manager.", jobId, queryableStateName);
-
- final CompletableFuture<KvStateLocation> location = new
CompletableFuture<>();
- lookupCache.put(cacheKey, location);
- return proxy.getJobManagerFuture().thenComposeAsync(
- jobManagerGateway -> {
- final Object msg = new
KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
- jobManagerGateway.ask(msg,
FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
- .onComplete(new
OnComplete<KvStateLocation>() {
-
- @Override
- public void
onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
- if
(failure != null) {
-
if (failure instanceof FlinkJobNotFoundException) {
-
// if the jobId was wrong, remove the entry from the cache.
-
lookupCache.remove(cacheKey);
-
}
-
location.completeExceptionally(failure);
- } else {
-
location.complete(loc);
- }
- }
- },
Executors.directExecutionContext());
- return location;
- }, queryExecutor);
+ final KvStateLocationOracle kvStateLocationOracle =
proxy.getKvStateLocationOracle(jobId);
+
+ if (kvStateLocationOracle != null) {
+ LOG.debug("Retrieving location for state={} of job={}
from the key-value state location oracle.", jobId, queryableStateName);
+ final CompletableFuture<KvStateLocation> location = new
CompletableFuture<>();
+ lookupCache.put(cacheKey, location);
+
+ kvStateLocationOracle
+ .requestKvStateLocation(jobId,
queryableStateName)
+ .whenComplete(
+ (KvStateLocation kvStateLocation,
Throwable throwable) -> {
+ if (throwable != null) {
+ if
(ExceptionUtils.stripCompletionException(throwable) instanceof
FlinkJobNotFoundException) {
+ // if the jobId
was wrong, remove the entry from the cache.
+
lookupCache.remove(cacheKey);
+ }
+
location.completeExceptionally(throwable);
+ } else {
+
location.complete(kvStateLocation);
+ }
+ });
+
+ return location;
+ } else {
+ return FutureUtils.completedExceptionally(new
UnknownJobManagerException());
--- End diff --
I'll rename it into `UnknownLocationException`. Since this exception is
marked as internal it should not be a problem.
> Integrate queryable state with Flip-6
> -------------------------------------
>
> Key: FLINK-8493
> URL: https://issues.apache.org/jira/browse/FLINK-8493
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination, Queryable State
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make Flip-6 support queryable state, we have to start and
> register the {{KvStateServer}} and {{KvStateProxyClient}} in the
> {{TaskExecutor}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)