[hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fc46db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fc46db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fc46db Branch: refs/heads/flip-6 Commit: 44fc46dba0dcf91ee0f430f1e37f9f28e49ebbc2 Parents: 62e8e33 Author: Stephan Ewen <[email protected]> Authored: Fri Dec 2 17:43:10 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../runtime/highavailability/EmbeddedNonHaServices.java | 7 +++++-- .../highavailability/nonha/AbstractNonHaServices.java | 9 +++++++-- .../highavailability/nonha/EmbeddedLeaderService.java | 5 ++++- .../src/test/resources/log4j-test.properties | 2 +- 4 files changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java index 523218e..b91cec1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java @@ -56,7 +56,10 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High @Override public void shutdown() throws Exception { - super.shutdown(); - resourceManagerLeaderService.shutdown(); + try { + super.shutdown(); + } finally { + resourceManagerLeaderService.shutdown(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 237727f..474faa8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -55,7 +55,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices; - private final RunningJobsRegistry runningJobsRegistry; + private final NonHaRegistry runningJobsRegistry; private boolean shutdown; @@ -167,8 +167,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices @Override public Thread newThread(@Nonnull Runnable r) { - Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet()); + Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet()); + + // HA threads should have a very high priority, but not + // keep the JVM running by themselves + thread.setPriority(Thread.MAX_PRIORITY); thread.setDaemon(true); + return thread; } } http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java index 84ac551..9fad9be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java @@ -225,7 +225,7 @@ public class EmbeddedLeaderService { // check if the confirmation is for the same grant, or whether it is a stale grant if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) { final String address = service.contender.getAddress(); - LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId); + LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId); // mark leadership currentLeaderConfirmed = service; @@ -271,6 +271,9 @@ public class EmbeddedLeaderService { currentLeaderSessionId = leaderSessionId; currentLeaderProposed = leaderService; + LOG.info("Proposing leadership to contender {} @ {}", + leaderService.contender, leaderService.contender.getAddress()); + notificationExecutor.execute( new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG)); } http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-streaming-java/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/resources/log4j-test.properties b/flink-streaming-java/src/test/resources/log4j-test.properties index 881dc06..e7cd3e0 100644 --- a/flink-streaming-java/src/test/resources/log4j-test.properties +++ b/flink-streaming-java/src/test/resources/log4j-test.properties @@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.A1.layout.ConversionPattern=%-5r [%-38t] %-5p %-60c %x - %m%n
