[FLINK-6635] [test] Fix ClientConnectionTest

The ClientConnectionTest passed even though it was failing the test because we
were expecting an exception and checking a special word to contained in the
exception's message. Unfortunately, we generated an AssertionError with the same
word if the actual logic we wanted to test failed. That cause the test to pass.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3979616
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3979616
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3979616

Branch: refs/heads/master
Commit: e3979616b4cd40db8f96bd661d52c37fcf84d57c
Parents: 392bc71
Author: Till Rohrmann <[email protected]>
Authored: Fri May 19 12:01:51 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon May 22 10:32:45 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 16 +++-
 .../client/program/StandaloneClusterClient.java |  4 +-
 .../client/program/ClientConnectionTest.java    | 93 ++++----------------
 .../runtime/util/LeaderRetrievalUtils.java      |  2 +-
 .../flink/core/testutils/CommonTestUtils.java   | 22 +++++
 5 files changed, 56 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index b081721..e09a0b6 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
@@ -56,6 +57,7 @@ import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -733,10 +735,16 @@ public abstract class ClusterClient {
         */
        public ActorGateway getJobManagerGateway() throws Exception {
                LOG.debug("Looking up JobManager");
-               return LeaderRetrievalUtils.retrieveLeaderGateway(
-                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                       actorSystemLoader.get(),
-                       lookupTimeout);
+
+               try {
+                       return LeaderRetrievalUtils.retrieveLeaderGateway(
+                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                               actorSystemLoader.get(),
+                               lookupTimeout);
+               } catch (LeaderRetrievalException lre) {
+                       throw new FlinkException("Could not connect to the 
leading JobManager. Please check that the " +
+                               "JobManager is running.", lre);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 7517504..b00e519 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -52,7 +52,7 @@ public class StandaloneClusterClient extends ClusterClient {
 
        @Override
        public String getWebInterfaceURL() {
-               String host = this.getJobManagerAddress().getHostString();
+               String host = getJobManagerAddress().getHostString();
                int port = 
getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
                return "http://"; +  host + ":" + port;
        }
@@ -70,7 +70,7 @@ public class StandaloneClusterClient extends ClusterClient {
                                throw new RuntimeException("Received the wrong 
reply " + result + " from cluster.");
                        }
                } catch (Exception e) {
-                       throw new RuntimeException("Couldn't retrieve the 
Cluster status.", e);
+                       throw new RuntimeException("Couldn't retrieve the 
cluster status.", e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index fc24a9d..3bfaa95 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -20,14 +20,14 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -35,17 +35,16 @@ import static org.junit.Assert.*;
  * This test starts a job client without the JobManager being reachable. It
  * tests for a timely error and a meaningful error message.
  */
-public class ClientConnectionTest {
+public class ClientConnectionTest extends TestLogger {
 
-       private static final long CONNECT_TIMEOUT = 2 * 1000; // 2 seconds
-       private static final long ASK_STARTUP_TIMEOUT = 100 * 1000; // 100 
seconds
-       private static final long MAX_DELAY = 50 * 1000; // less than the 
startup timeout
+       private static final long CONNECT_TIMEOUT = 100L; // 100 ms
+       private static final long ASK_STARTUP_TIMEOUT = 20000L; // 10 seconds
 
        /**
         * Tests the behavior against a LOCAL address where no job manager is 
running.
         */
        @Test
-       public void testExceptionWhenLocalJobManagerUnreachablelocal() {
+       public void testExceptionWhenLocalJobManagerUnreachablelocal() throws 
Exception {
 
                final InetSocketAddress unreachableEndpoint;
                try {
@@ -64,7 +63,7 @@ public class ClientConnectionTest {
         * Tests the behavior against a REMOTE address where no job manager is 
running.
         */
        @Test
-       public void testExceptionWhenRemoteJobManagerUnreachable() {
+       public void testExceptionWhenRemoteJobManagerUnreachable() throws 
Exception {
 
                final InetSocketAddress unreachableEndpoint;
                try {
@@ -79,78 +78,24 @@ public class ClientConnectionTest {
                testFailureBehavior(unreachableEndpoint);
        }
 
-       private void testFailureBehavior(final InetSocketAddress 
unreachableEndpoint) {
+       private static void testFailureBehavior(final InetSocketAddress 
unreachableEndpoint) throws Exception {
 
                final Configuration config = new Configuration();
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
(ASK_STARTUP_TIMEOUT/1000) + " s");
-               config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, 
(CONNECT_TIMEOUT/1000) + " s");
+               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
(ASK_STARTUP_TIMEOUT) + " ms");
+               config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, 
(CONNECT_TIMEOUT) + " ms");
                config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
unreachableEndpoint.getHostName());
                config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
unreachableEndpoint.getPort());
 
+               ClusterClient client = new StandaloneClusterClient(config);
 
                try {
-                       JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setInvokableClass(TestInvokable.class);
-
-                       final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
-
-                       Thread invoker = new Thread("test invoker") {
-                               @Override
-                               public void run() {
-                                       try {
-                                               new 
StandaloneClusterClient(config);
-                                               fail("This should fail with an 
exception since the JobManager is unreachable.");
-                                       }
-                                       catch (Throwable t) {
-                                               synchronized (error) {
-                                                       error.set(t);
-                                                       error.notifyAll();
-                                               }
-                                       }
-                               }
-                       };
-
-                       invoker.setDaemon(true);
-                       invoker.start();
-
-                       try {
-                               // wait until the caller is successful, for at 
most the given time
-                               long now = System.nanoTime();
-                               long deadline = now + MAX_DELAY * 1_000_000;
-
-                               synchronized (error) {
-                                       while (invoker.isAlive() && error.get() 
== null && now < deadline) {
-                                               error.wait(1000);
-                                               now = System.nanoTime();
-                                       }
-                               }
-
-                               Throwable t = error.get();
-                               if (t == null) {
-                                       fail("Job invocation did not fail in 
expected time interval.");
-                               }
-                               else {
-                                       assertNotNull(t.getMessage());
-                                       assertTrue(t.getMessage(), 
t.getMessage().contains("JobManager"));
-                               }
-                       }
-                       finally {
-                               if (invoker.isAlive()) {
-                                       invoker.interrupt();
-                               }
-                       }
+                       // we have to query the cluster status to start the 
connection attempts
+                       client.getClusterStatus();
+                       fail("This should fail with an exception since the 
endpoint is unreachable.");
+               } catch (Exception e) {
+                       // check that we have failed with a 
LeaderRetrievalException which says that we could
+                       // not connect to the leading JobManager
+                       assertTrue(CommonTestUtils.containsCause(e, 
LeaderRetrievalException.class));
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public static class TestInvokable extends AbstractInvokable {
-
-               @Override
-               public void invoke() {}
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 073c52b..009bec6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -76,7 +76,7 @@ public class LeaderRetrievalUtils {
 
                        return Await.result(actorGatewayFuture, timeout);
                } catch (Exception e) {
-                       throw new LeaderRetrievalException("Could not retrieve 
the leader gateway", e);
+                       throw new LeaderRetrievalException("Could not retrieve 
the leader gateway.", e);
                } finally {
                        try {
                                leaderRetrievalService.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index cf2bb7f..33811f2 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -302,4 +302,26 @@ public class CommonTestUtils {
                        throw new RuntimeException("Unclassified error while 
trying to access the sun.misc.Unsafe handle.", t);
                }
        }
+
+       /**
+        * Checks whether the given throwable contains the given cause as a 
cause. The cause is not checked
+        * on equality but on type equality.
+        *
+        * @param throwable Throwable to check for the cause
+        * @param cause Cause to look for
+        * @return True if the given Throwable contains the given cause (type 
equality); otherwise false
+        */
+       public static boolean containsCause(Throwable throwable, Class<? 
extends Throwable> cause) {
+               Throwable current = throwable;
+
+               while (current != null) {
+                       if (cause.isAssignableFrom(current.getClass())) {
+                               return true;
+                       }
+
+                       current = current.getCause();
+               }
+
+               return false;
+       }
 }

Reply via email to