Repository: flink
Updated Branches:
  refs/heads/master 81dc260dc -> a0838de79


[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is 
thrown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0838de7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0838de7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0838de7

Branch: refs/heads/master
Commit: a0838de79ff73b0322f3ce255df54f5f33b2bf3b
Parents: 81dc260
Author: kkloudas <kklou...@gmail.com>
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Nov 17 10:29:30 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerHandler.java          |   2 +-
 .../client/proxy/KvStateClientProxyHandler.java |  11 +-
 .../itcases/AbstractQueryableStateTestBase.java | 230 ++++++++++++-------
 3 files changed, 150 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandler<REQ extends 
MessageBody, RESP extend
                                        try {
                                                stats.reportFailedRequest();
 
-                                               final String errMsg = "Failed 
request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t);
+                                               final String errMsg = "Failed 
request " + requestId + "." + System.lineSeparator() + " Caused by: " + 
ExceptionUtils.stringifyException(t);
                                                final ByteBuf err = 
MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new 
RuntimeException(errMsg));
                                                ctx.writeAndFlush(err);
                                        } catch (IOException io) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends 
AbstractServerHandler<KvStateRequ
                        operationFuture.whenCompleteAsync(
                                        (t, throwable) -> {
                                                if (throwable != null) {
-                                                       if (throwable 
instanceof CancellationException) {
-                                                               
result.completeExceptionally(throwable);
-                                                       } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+                                                       if (
+                                                                       
throwable.getCause() instanceof UnknownKvStateIdException ||
                                                                        
throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
-                                                                       
throwable.getCause() instanceof UnknownKvStateLocation ||
-                                                                       
throwable.getCause() instanceof ConnectException) {
+                                                                       
throwable.getCause() instanceof ConnectException
+                                                               ) {
 
                                                                // These 
failures are likely to be caused by out-of-sync
                                                                // 
KvStateLocation. Therefore we retry this query and

http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index b4bae9c..c1cbb61 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -89,12 +88,10 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.function.Supplier;
 
 import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
@@ -103,15 +100,14 @@ import scala.reflect.ClassTag$;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Base class for queryable state integration tests with a configurable state 
backend.
  */
 public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
-       private static final int NO_OF_RETRIES = 100;
        private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000L, TimeUnit.SECONDS);
-       private static final Time QUERY_RETRY_DELAY = Time.milliseconds(100L);
 
        private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(4);
        private final ScheduledExecutor executor = new 
ScheduledExecutorServiceAdapter(executorService);
@@ -229,14 +225,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                                allNonZero = false;
                                        }
 
-                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = 
getKvStateWithRetries(
+                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvState(
+                                                       deadline,
                                                        client,
                                                        jobId,
                                                        queryName,
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        reducingState,
-                                                       QUERY_RETRY_DELAY,
                                                        false,
                                                        executor);
 
@@ -284,7 +280,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         *
         * <b>NOTE: </b> This test is only in the non-HA variant of the tests 
because
         * in the HA mode we use the actual JM code which does not recognize the
-        * {@code NotifyWhenJobStatus} message.  *
+        * {@code NotifyWhenJobStatus} message.
         */
        @Test
        public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -439,6 +435,92 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
        }
 
        /**
+        * Tests that the correct exception is thrown if the query
+        * contains a wrong queryable state name.
+        */
+       @Test
+       public void testWrongQueryableStateName() throws Exception {
+               // Config
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+               final long numElements = 1024L;
+
+               JobID jobId = null;
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setStateBackend(stateBackend);
+                       env.setParallelism(maxParallelism);
+                       // Very important, because cluster is shared between 
tests and we
+                       // don't explicitly check that all slots are available 
before
+                       // submitting.
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
+
+                       DataStream<Tuple2<Integer, Long>> source = env
+                                       .addSource(new 
TestAscendingValueSource(numElements));
+
+                       // Value state
+                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
+                                       new ValueStateDescriptor<>("any", 
source.getType());
+
+                       source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
+                               private static final long serialVersionUID = 
7662520075515707428L;
+
+                               @Override
+                               public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       }).asQueryableState("hakuna", valueState);
+
+                       // Submit the job graph
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       jobId = jobGraph.getJobID();
+
+                       
CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = 
FutureUtils.toJava(
+                                       
cluster.getLeaderGateway(deadline.timeLeft())
+                                                       .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
+                                                       
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
+
+                       cluster.submitJobDetached(jobGraph);
+
+                       // expect for the job to be running
+                       TestingJobManagerMessages.JobStatusIs jobStatus =
+                                       
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       assertEquals(JobStatus.RUNNING, jobStatus.state());
+
+                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = client.getKvState(
+                                       jobId,
+                                       "wrong-hankuna", // this is the wrong 
name.
+                                       0,
+                                       VoidNamespace.INSTANCE,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       VoidNamespaceTypeInfo.INSTANCE,
+                                       valueState);
+
+                       try {
+                               future.get();
+                               fail(); // by now the job must have failed.
+                       } catch (ExecutionException e) {
+                               Assert.assertTrue(e.getCause() instanceof 
RuntimeException);
+                               
Assert.assertTrue(e.getCause().getMessage().contains(
+                                               "UnknownKvStateLocation: No 
KvStateLocation found for KvState instance with name 'wrong-hankuna'."));
+                       } catch (Exception ignored) {
+                               fail("Unexpected type of exception.");
+                       }
+
+               } finally {
+                       // Free cluster resources
+                       if (jobId != null) {
+                               CompletableFuture<CancellationSuccess> 
cancellation = FutureUtils.toJava(cluster
+                                               
.getLeaderGateway(deadline.timeLeft())
+                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
+                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)));
+
+                               
cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
+       /**
         * Similar tests as {@link #testValueState()} but before submitting the
         * job, we already issue one request which fails.
         */
@@ -572,14 +654,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
                        // Now query
                        int key = 0;
-                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = getKvStateWithRetries(
+                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = getKvState(
+                                       deadline,
                                        client,
                                        jobId,
                                        queryableState.getQueryableStateName(),
                                        key,
                                        BasicTypeInfo.INT_TYPE_INFO,
                                        valueState,
-                                       QUERY_RETRY_DELAY,
                                        true,
                                        executor);
 
@@ -723,14 +805,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       
CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = 
getKvStateWithRetries(
+                                       
CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = 
getKvState(
+                                                       deadline,
                                                        client,
                                                        jobId,
                                                        "pumba",
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        foldingState,
-                                                       QUERY_RETRY_DELAY,
                                                        false,
                                                        executor);
 
@@ -814,14 +896,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = 
getKvStateWithRetries(
+                                       
CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvState(
+                                                       deadline,
                                                        client,
                                                        jobId,
                                                        "jungle",
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        reducingState,
-                                                       QUERY_RETRY_DELAY,
                                                        false,
                                                        executor);
 
@@ -923,14 +1005,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       CompletableFuture<MapState<Integer, 
Tuple2<Integer, Long>>> future = getKvStateWithRetries(
+                                       CompletableFuture<MapState<Integer, 
Tuple2<Integer, Long>>> future = getKvState(
+                                                       deadline,
                                                        client,
                                                        jobId,
                                                        "timon-queryable",
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        mapStateDescriptor,
-                                                       QUERY_RETRY_DELAY,
                                                        false,
                                                        executor);
 
@@ -1028,14 +1110,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       CompletableFuture<ListState<Long>> 
future = getKvStateWithRetries(
+                                       final 
CompletableFuture<ListState<Long>> future = getKvState(
+                                                       deadline,
                                                        client,
                                                        jobId,
                                                        "list-queryable",
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        listStateDescriptor,
-                                                       QUERY_RETRY_DELAY,
                                                        false,
                                                        executor);
 
@@ -1130,14 +1212,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
-                                       
CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = 
getKvStateWithRetries(
+                                       
CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = 
getKvState(
+                                                       deadline,
                                                        client,
                                                        jobId,
                                                        "aggr-queryable",
                                                        key,
                                                        
BasicTypeInfo.INT_TYPE_INFO,
                                                        aggrStateDescriptor,
-                                                       QUERY_RETRY_DELAY,
                                                        false,
                                                        executor);
 
@@ -1372,84 +1454,62 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
        /////                           General Utility Methods                 
        //////
 
-       private static <K, S extends State, V> CompletableFuture<S> 
getKvStateWithRetries(
+       private static <K, S extends State, V> CompletableFuture<S> getKvState(
+                       final Deadline deadline,
                        final QueryableStateClient client,
                        final JobID jobId,
                        final String queryName,
                        final K key,
                        final TypeInformation<K> keyTypeInfo,
                        final StateDescriptor<S, V> stateDescriptor,
-                       final Time retryDelay,
                        final boolean failForUnknownKeyOrNamespace,
-                       final ScheduledExecutor executor) {
-               return retryWithDelay(
-                               () -> client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor),
-                               NO_OF_RETRIES,
-                               retryDelay,
-                               executor,
-                               failForUnknownKeyOrNamespace);
-       }
-
-       private static <T> CompletableFuture<T> retryWithDelay(
-                       final Supplier<CompletableFuture<T>> operation,
-                       final int retries,
-                       final Time retryDelay,
-                       final ScheduledExecutor scheduledExecutor,
-                       final boolean failIfUnknownKeyOrNamespace) {
-
-               final CompletableFuture<T> resultFuture = new 
CompletableFuture<>();
-
-               retryWithDelay(
-                               resultFuture,
-                               operation,
-                               retries,
-                               retryDelay,
-                               scheduledExecutor,
-                               failIfUnknownKeyOrNamespace);
+                       final ScheduledExecutor executor) throws 
InterruptedException {
 
+               final CompletableFuture<S> resultFuture = new 
CompletableFuture<>();
+               getKvStateIgnoringCertainExceptions(
+                               deadline, resultFuture, client, jobId, 
queryName, key, keyTypeInfo,
+                               stateDescriptor, failForUnknownKeyOrNamespace, 
executor);
                return resultFuture;
        }
 
-       public static <T> void retryWithDelay(
-                       final CompletableFuture<T> resultFuture,
-                       final Supplier<CompletableFuture<T>> operation,
-                       final int retries,
-                       final Time retryDelay,
-                       final ScheduledExecutor scheduledExecutor,
-                       final boolean failIfUnknownKeyOrNamespace) {
+       private static <K, S extends State, V> void 
getKvStateIgnoringCertainExceptions(
+                       final Deadline deadline,
+                       final CompletableFuture<S> resultFuture,
+                       final QueryableStateClient client,
+                       final JobID jobId,
+                       final String queryName,
+                       final K key,
+                       final TypeInformation<K> keyTypeInfo,
+                       final StateDescriptor<S, V> stateDescriptor,
+                       final boolean failForUnknownKeyOrNamespace,
+                       final ScheduledExecutor executor) throws 
InterruptedException {
 
                if (!resultFuture.isDone()) {
-                       final CompletableFuture<T> operationResultFuture = 
operation.get();
-                       operationResultFuture.whenCompleteAsync(
-                                       (t, throwable) -> {
-                                               if (throwable != null) {
-                                                       if 
(throwable.getCause() instanceof CancellationException) {
-                                                               
resultFuture.completeExceptionally(new FutureUtils.RetryException("Operation 
future was cancelled.", throwable.getCause()));
-                                                       } else if 
(throwable.getCause() instanceof AssertionError ||
-                                                                       
(failIfUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)) {
-                                                               
resultFuture.completeExceptionally(throwable.getCause());
-                                                       } else {
-                                                               if (retries > 
0) {
-                                                                       final 
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
-                                                                               
        () -> retryWithDelay(resultFuture, operation, retries - 1, retryDelay, 
scheduledExecutor, failIfUnknownKeyOrNamespace),
-                                                                               
        retryDelay.toMilliseconds(),
-                                                                               
        TimeUnit.MILLISECONDS);
-
-                                                                       
resultFuture.whenComplete(
-                                                                               
        (innerT, innerThrowable) -> scheduledFuture.cancel(false));
-                                                               } else {
-                                                                       
resultFuture.completeExceptionally(new FutureUtils.RetryException("Could not 
complete the operation. Number of retries " +
-                                                                               
        "has been exhausted.", throwable));
-                                                               }
-                                                       }
-                                               } else {
-                                                       
resultFuture.complete(t);
+                       Thread.sleep(100L);
+                       CompletableFuture<S> expected = 
client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, 
VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+                       expected.whenCompleteAsync((result, throwable) -> {
+                               if (throwable != null) {
+                                       if (
+                                                       throwable.getCause() 
instanceof CancellationException ||
+                                                       throwable.getCause() 
instanceof AssertionError ||
+                                                       
(failForUnknownKeyOrNamespace && throwable.getCause() instanceof 
UnknownKeyOrNamespaceException)
+                                       ) {
+                                               
resultFuture.completeExceptionally(throwable.getCause());
+                                       } else if (deadline.hasTimeLeft()) {
+                                               try {
+                                                       
getKvStateIgnoringCertainExceptions(
+                                                                       
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
+                                                                       
stateDescriptor, failForUnknownKeyOrNamespace, executor);
+                                               } catch (InterruptedException 
e) {
+                                                       e.printStackTrace();
                                                }
-                                       },
-                                       scheduledExecutor);
+                                       }
+                               } else {
+                                       resultFuture.complete(result);
+                               }
+                       }, executor);
 
-                       resultFuture.whenComplete(
-                                       (t, throwable) -> 
operationResultFuture.cancel(false));
+                       resultFuture.whenComplete((result, throwable) -> 
expected.cancel(false));
                }
        }
 
@@ -1468,14 +1528,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                for (int key = 0; key < maxParallelism; key++) {
                        boolean success = false;
                        while (deadline.hasTimeLeft() && !success) {
-                               CompletableFuture<ValueState<Tuple2<Integer, 
Long>>> future = getKvStateWithRetries(
+                               CompletableFuture<ValueState<Tuple2<Integer, 
Long>>> future = getKvState(
+                                               deadline,
                                                client,
                                                jobId,
                                                queryableStateName,
                                                key,
                                                BasicTypeInfo.INT_TYPE_INFO,
                                                stateDescriptor,
-                                               QUERY_RETRY_DELAY,
                                                false,
                                                executor);
 

Reply via email to