Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5021#discussion_r151179694
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
    @@ -439,6 +443,85 @@ public Integer getKey(Tuple2<Integer, Long> value) 
throws Exception {
        }
     
        /**
    +    * Tests that the correct exception is thrown if the query
    +    * contains a wrong queryable state name.
    +    */
    +   @Test
    +   public void testWrongQueryableStateName() throws Exception {
    +           // Config
    +           final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +           final long numElements = 1024L;
    +
    +           JobID jobId = null;
    +           try {
    +                   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +                   env.setStateBackend(stateBackend);
    +                   env.setParallelism(maxParallelism);
    +                   // Very important, because cluster is shared between 
tests and we
    +                   // don't explicitly check that all slots are available 
before
    +                   // submitting.
    +                   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
    +
    +                   DataStream<Tuple2<Integer, Long>> source = env
    +                                   .addSource(new 
TestAscendingValueSource(numElements));
    +
    +                   // Value state
    +                   ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
    +                                   new ValueStateDescriptor<>("any", 
source.getType());
    +
    +                   source.keyBy(new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
    +                           private static final long serialVersionUID = 
7662520075515707428L;
    +
    +                           @Override
    +                           public Integer getKey(Tuple2<Integer, Long> 
value) throws Exception {
    +                                   return value.f0;
    +                           }
    +                   }).asQueryableState("hakuna", valueState);
    +
    +                   // Submit the job graph
    +                   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +                   jobId = jobGraph.getJobID();
    +
    +                   cluster.submitJobDetached(jobGraph);
    +
    +                   // wait until the job is running before starting to 
query.
    +                   
FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
    +                                   .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING), 
deadline.timeLeft())
    +                                   
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
    +
    +                   CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = client.getKvState(
    +                                   jobId,
    +                                   "wrong-hankuna", // this is the wrong 
name.
    +                                   0,
    +                                   VoidNamespace.INSTANCE,
    +                                   BasicTypeInfo.INT_TYPE_INFO,
    +                                   VoidNamespaceTypeInfo.INSTANCE,
    +                                   valueState);
    +
    +                   final CompletableFuture<?> completion = new 
CompletableFuture<>();
    +                   future.whenComplete((result, throwable) -> {
    +                           Assert.assertTrue(throwable != null);
    +                           
Assert.assertTrue(throwable.getMessage().contains("UnknownKvStateLocation"));
    +                           completion.complete(null);
    +                   });
    +
    +                   completion.join();
    --- End diff --
    
    if the test fails this will block indefinitely.


---

Reply via email to