Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5062#discussion_r155225199
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 ---
    @@ -260,89 +260,90 @@ public void testDuplicateRegistrationFailsJob() 
throws Exception {
                final Deadline deadline = TEST_TIMEOUT.fromNow();
                final int numKeys = 256;
     
    -           JobID jobId = null;
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setStateBackend(stateBackend);
    +           env.setParallelism(maxParallelism);
    +           // Very important, because cluster is shared between tests and 
we
    +           // don't explicitly check that all slots are available before
    +           // submitting.
    +           
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
     
    -           try {
    -                   //
    -                   // Test program
    -                   //
    -                   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    -                   env.setStateBackend(stateBackend);
    -                   env.setParallelism(maxParallelism);
    -                   // Very important, because cluster is shared between 
tests and we
    -                   // don't explicitly check that all slots are available 
before
    -                   // submitting.
    -                   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
    -
    -                   DataStream<Tuple2<Integer, Long>> source = env
    -                                   .addSource(new 
TestKeyRangeSource(numKeys));
    -
    -                   // Reducing state
    -                   ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState = new ReducingStateDescriptor<>(
    -                                   "any-name",
    -                                   new SumReduce(),
    -                                   source.getType());
    -
    -                   final String queryName = "duplicate-me";
    -
    -                   final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> queryableState =
    -                                   source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
    -                                           private static final long 
serialVersionUID = -4126824763829132959L;
    -
    -                                           @Override
    -                                           public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
    -                                                   return value.f0;
    -                                           }
    -                                   }).asQueryableState(queryName, 
reducingState);
    +           DataStream<Tuple2<Integer, Long>> source = env.addSource(new 
TestKeyRangeSource(numKeys));
     
    -                   final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> duplicate =
    -                                   source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
    -                                           private static final long 
serialVersionUID = -6265024000462809436L;
    +           // Reducing state
    +           ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = 
new ReducingStateDescriptor<>(
    +                           "any-name",
    +                           new SumReduce(),
    +                           source.getType());
     
    -                                           @Override
    -                                           public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
    -                                                   return value.f0;
    -                                           }
    -                                   }).asQueryableState(queryName);
    +           final String queryName = "duplicate-me";
     
    -                   // Submit the job graph
    -                   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    -                   jobId = jobGraph.getJobID();
    +           final QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
    +                           source.keyBy(new KeySelector<Tuple2<Integer, 
Long>, Integer>() {
    +                                   private static final long 
serialVersionUID = -4126824763829132959L;
     
    -                   final 
CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
    -                                   notifyWhenJobStatusIs(jobId, 
JobStatus.FAILED, deadline);
    +                                   @Override
    +                                   public Integer getKey(Tuple2<Integer, 
Long> value) {
    +                                           return value.f0;
    +                                   }
    +                           }).asQueryableState(queryName, reducingState);
     
    -                   cluster.submitJobDetached(jobGraph);
    +           final QueryableStateStream<Integer, Tuple2<Integer, Long>> 
duplicate =
    +                           source.keyBy(new KeySelector<Tuple2<Integer, 
Long>, Integer>() {
    +                                   private static final long 
serialVersionUID = -6265024000462809436L;
     
    -                   TestingJobManagerMessages.JobStatusIs jobStatus =
    -                                   
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    -                   assertEquals(JobStatus.FAILED, jobStatus.state());
    +                                   @Override
    +                                   public Integer getKey(Tuple2<Integer, 
Long> value) {
    +                                           return value.f0;
    +                                   }
    +                           }).asQueryableState(queryName);
    +
    +           // Submit the job graph
    +           final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +           final JobID jobId = jobGraph.getJobID();
    +
    +           final CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
failedFuture =
    +                           notifyWhenJobStatusIs(jobId, JobStatus.FAILED, 
deadline);
     
    -                   // Get the job and check the cause
    -                   JobManagerMessages.JobFound jobFound = 
FutureUtils.toJava(
    -                                   
cluster.getLeaderGateway(deadline.timeLeft())
    -                                                   .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
    -                                                   
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
    -                                   .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
    +           final CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
cancellationFuture =
    +                           notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
     
    -                   String failureCause = 
jobFound.executionGraph().getFailureCause().getExceptionAsString();
    +           cluster.submitJobDetached(jobGraph);
     
    -                   assertTrue("Not instance of SuppressRestartsException", 
failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
    -                   int causedByIndex = failureCause.indexOf("Caused by: ");
    -                   String subFailureCause = 
failureCause.substring(causedByIndex + "Caused by: ".length());
    -                   assertTrue("Not caused by IllegalStateException", 
subFailureCause.startsWith("java.lang.IllegalStateException"));
    -                   assertTrue("Exception does not contain registration 
name", subFailureCause.contains(queryName));
    +           try {
    +                   final TestingJobManagerMessages.JobStatusIs jobStatus =
    +                                   
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    +                   assertEquals(JobStatus.FAILED, jobStatus.state());
    --- End diff --
    
    isn't this always true if the future did not time out? (In which case get() 
throws a TimeoutException)


---

Reply via email to