[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752 Branch: refs/heads/release-1.4 Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c Parents: 12b0c58 Author: kkloudas <[email protected]> Authored: Thu Nov 16 17:45:49 2017 +0100 Committer: kkloudas <[email protected]> Committed: Fri Nov 17 11:20:55 2017 +0100 ---------------------------------------------------------------------- .../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++----- .../flink/runtime/jobmanager/JobManager.scala | 4 +-- .../runtime/jobmanager/JobManagerTest.java | 5 +-- 3 files changed, 29 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index a789dbd..65e9bb5 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that duplicate query registrations fail the job at the JobManager. - * - * <b>NOTE: </b> This test is only in the non-HA variant of the tests because - * in the HA mode we use the actual JM code which does not recognize the - * {@code NotifyWhenJobStatus} message. */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { @@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that the correct exception is thrown if the query - * contains a wrong queryable state name. + * contains a wrong jobId or wrong queryable state name. */ @Test - public void testWrongQueryableStateName() throws Exception { + public void testWrongJobIdAndWrongQueryableStateName() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); @@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); assertEquals(JobStatus.RUNNING, jobStatus.state()); - CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState( + final JobID wrongJobId = new JobID(); + + CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState( + wrongJobId, // this is the wrong job id + "hankuna", + 0, + BasicTypeInfo.INT_TYPE_INFO, + valueState); + + try { + unknownJobFuture.get(); + fail(); // by now the job must have failed. + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof RuntimeException); + Assert.assertTrue(e.getCause().getMessage().contains( + "FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")")); + } catch (Exception ignored) { + fail("Unexpected type of exception."); + } + + CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState( jobId, "wrong-hankuna", // this is the wrong name. 0, @@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { valueState); try { - future.get(); + unknownQSName.get(); fail(); // by now the job must have failed. } catch (ExecutionException e) { Assert.assertTrue(e.getCause() instanceof RuntimeException); http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4fb1196..f57637a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} -import org.apache.flink.runtime.messages.{Acknowledge, StackTrace} +import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup import org.apache.flink.runtime.metrics.util.MetricUtils import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry} @@ -1503,7 +1503,7 @@ class JobManager( } case None => - sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found")) + sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId)) } // TaskManager KvState registration http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index a697aae..6a02d1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -63,6 +63,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; @@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger { try { Await.result(lookupFuture, deadline.timeLeft()); fail("Did not throw expected Exception"); - } catch (IllegalStateException ignored) { + } catch (FlinkJobNotFoundException ignored) { // Expected } @@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger { try { Await.result(lookupFuture, deadline.timeLeft()); fail("Did not throw expected Exception"); - } catch (IllegalStateException ignored) { + } catch (FlinkJobNotFoundException ignored) { // Expected }
