[FLINK-8961][tests] Port JobRetrievalITCase to flip6

This closes #5730.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cc77f9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cc77f9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cc77f9f

Branch: refs/heads/release-1.5
Commit: 2cc77f9f6e999238ae9dd7d24712e5d7a397f4cb
Parents: 5423d0e
Author: zentol <ches...@apache.org>
Authored: Tue Mar 20 15:19:47 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../test/example/client/JobRetrievalITCase.java | 121 +++++++-------
 .../client/LegacyJobRetrievalITCase.java        | 162 +++++++++++++++++++
 2 files changed, 224 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cc77f9f/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 57198c0..6b747e0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -21,30 +21,27 @@ package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -52,23 +49,41 @@ import static org.junit.Assert.fail;
 /**
  * Tests retrieval of a job from a running Flink cluster.
  */
+@Category(New.class)
 public class JobRetrievalITCase extends TestLogger {
 
        private static final Semaphore lock = new Semaphore(1);
 
-       private static FlinkMiniCluster cluster;
-
-       @BeforeClass
-       public static void before() {
-               Configuration configuration = new Configuration();
-               cluster = new TestingCluster(configuration, false);
-               cluster.start();
+       @ClassRule
+       public static final MiniClusterResource CLUSTER = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       new Configuration(),
+                       1,
+                       4
+               ),
+               MiniClusterResource.MiniClusterType.NEW
+       );
+
+       private RestClusterClient<StandaloneClusterId> client;
+
+       @Before
+       public void setUp() throws Exception {
+               final Configuration clientConfig = new Configuration();
+               clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+               clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
+               clientConfig.addAll(CLUSTER.getClientConfiguration());
+
+               client = new RestClusterClient<>(
+                       clientConfig,
+                       StandaloneClusterId.getInstance()
+               );
        }
 
-       @AfterClass
-       public static void after() {
-               cluster.stop();
-               cluster = null;
+       @After
+       public void tearDown() {
+               if (client != null) {
+                       client.shutdown();
+               }
        }
 
        @Test
@@ -80,64 +95,52 @@ public class JobRetrievalITCase extends TestLogger {
 
                final JobGraph jobGraph = new JobGraph(jobID, "testjob", 
imalock);
 
-               final ClusterClient<StandaloneClusterId> client = new 
StandaloneClusterClient(cluster.configuration(), 
cluster.highAvailabilityServices(), true);
-
                // acquire the lock to make sure that the job cannot complete 
until the job client
                // has been attached in resumingThread
                lock.acquire();
-               client.runDetached(jobGraph, 
JobRetrievalITCase.class.getClassLoader());
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final Thread resumingThread = new Thread(new Runnable() {
+               client.setDetached(true);
+               client.submitJob(jobGraph, 
JobRetrievalITCase.class.getClassLoader());
+
+               final CheckedThread resumingThread = new 
CheckedThread("Flink-Job-Retriever") {
                        @Override
-                       public void run() {
-                               try {
-                                       
assertNotNull(client.retrieveJob(jobID));
-                               } catch (Throwable e) {
-                                       error.set(e);
-                               }
+                       public void go() throws Exception {
+                               
assertNotNull(client.requestJobResult(jobID).get());
                        }
-               }, "Flink-Job-Retriever");
-
-               final Seq<ActorSystem> actorSystemSeq = 
cluster.jobManagerActorSystems().get();
-               final ActorSystem actorSystem = actorSystemSeq.last();
-               JavaTestKit testkit = new JavaTestKit(actorSystem);
+               };
 
-               final ActorRef jm = cluster.getJobManagersAsJava().get(0);
-               // wait until client connects
-               
jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), 
testkit.getRef());
-               // confirm registration
-               testkit.expectMsgEquals(true);
+               // wait until the job is running
+               while (client.listJobs().get().isEmpty()) {
+                       Thread.sleep(50);
+               }
 
                // kick off resuming
                resumingThread.start();
 
                // wait for client to connect
-               testkit.expectMsgAllOf(
-                       TestingJobManagerMessages.getClientConnected(),
-                       
TestingJobManagerMessages.getClassLoadingPropsDelivered());
+               while (resumingThread.getState() != Thread.State.WAITING) {
+                       Thread.sleep(10);
+               }
 
                // client has connected, we can release the lock
                lock.release();
 
-               resumingThread.join();
-
-               Throwable exception = error.get();
-               if (exception != null) {
-                       throw new AssertionError(exception);
-               }
+               resumingThread.sync();
        }
 
        @Test
        public void testNonExistingJobRetrieval() throws Exception {
                final JobID jobID = new JobID();
-               ClusterClient<StandaloneClusterId> client = new 
StandaloneClusterClient(cluster.configuration());
 
                try {
-                       client.retrieveJob(jobID);
+                       client.requestJobResult(jobID).get();
                        fail();
-               } catch (JobRetrievalException ignored) {
-                       // this is what we want
+               } catch (Exception exception) {
+                       Optional<Throwable> expectedCause = 
ExceptionUtils.findThrowable(exception,
+                               candidate -> candidate.getMessage() != null && 
candidate.getMessage().contains("Could not find Flink job"));
+                       if (!expectedCause.isPresent()) {
+                               throw exception;
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cc77f9f/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
new file mode 100644
index 0000000..174c90e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/LegacyJobRetrievalITCase.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.example.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.collection.Seq;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests retrieval of a job from a running Flink cluster.
+ */
+public class LegacyJobRetrievalITCase extends TestLogger {
+
+       private static final Semaphore lock = new Semaphore(1);
+
+       private static FlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void before() {
+               Configuration configuration = new Configuration();
+               cluster = new TestingCluster(configuration, false);
+               cluster.start();
+       }
+
+       @AfterClass
+       public static void after() {
+               cluster.stop();
+               cluster = null;
+       }
+
+       @Test
+       public void testJobRetrieval() throws Exception {
+               final JobID jobID = new JobID();
+
+               final JobVertex imalock = new JobVertex("imalock");
+               imalock.setInvokableClass(SemaphoreInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph(jobID, "testjob", 
imalock);
+
+               final ClusterClient<StandaloneClusterId> client = new 
StandaloneClusterClient(cluster.configuration(), 
cluster.highAvailabilityServices(), true);
+
+               // acquire the lock to make sure that the job cannot complete 
until the job client
+               // has been attached in resumingThread
+               lock.acquire();
+               client.runDetached(jobGraph, 
LegacyJobRetrievalITCase.class.getClassLoader());
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final Thread resumingThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       
assertNotNull(client.retrieveJob(jobID));
+                               } catch (Throwable e) {
+                                       error.set(e);
+                               }
+                       }
+               }, "Flink-Job-Retriever");
+
+               final Seq<ActorSystem> actorSystemSeq = 
cluster.jobManagerActorSystems().get();
+               final ActorSystem actorSystem = actorSystemSeq.last();
+               JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+               final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+               // wait until client connects
+               
jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), 
testkit.getRef());
+               // confirm registration
+               testkit.expectMsgEquals(true);
+
+               // kick off resuming
+               resumingThread.start();
+
+               // wait for client to connect
+               testkit.expectMsgAllOf(
+                       TestingJobManagerMessages.getClientConnected(),
+                       
TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
+               // client has connected, we can release the lock
+               lock.release();
+
+               resumingThread.join();
+
+               Throwable exception = error.get();
+               if (exception != null) {
+                       throw new AssertionError(exception);
+               }
+       }
+
+       @Test
+       public void testNonExistingJobRetrieval() throws Exception {
+               final JobID jobID = new JobID();
+               ClusterClient<StandaloneClusterId> client = new 
StandaloneClusterClient(cluster.configuration());
+
+               try {
+                       client.retrieveJob(jobID);
+                       fail();
+               } catch (JobRetrievalException ignored) {
+                       // this is what we want
+               }
+       }
+
+       /**
+        * Invokable that waits on {@link #lock} to be released and finishes 
afterwards.
+        *
+        * <p>NOTE: needs to be <tt>public</tt> so that a task can be run with 
this!
+        */
+       public static class SemaphoreInvokable extends AbstractInvokable {
+
+               public SemaphoreInvokable(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       lock.acquire();
+                       lock.release();
+               }
+       }
+
+}

Reply via email to