[hotfix] Improve logging and thread characteristics for 'EmbeddedNonHaServices'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fc46db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fc46db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fc46db

Branch: refs/heads/flip-6
Commit: 44fc46dba0dcf91ee0f430f1e37f9f28e49ebbc2
Parents: 62e8e33
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 2 17:43:10 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../runtime/highavailability/EmbeddedNonHaServices.java     | 7 +++++--
 .../highavailability/nonha/AbstractNonHaServices.java       | 9 +++++++--
 .../highavailability/nonha/EmbeddedLeaderService.java       | 5 ++++-
 .../src/test/resources/log4j-test.properties                | 2 +-
 4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 523218e..b91cec1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -56,7 +56,10 @@ public class EmbeddedNonHaServices extends 
AbstractNonHaServices implements High
 
        @Override
        public void shutdown() throws Exception {
-               super.shutdown();
-               resourceManagerLeaderService.shutdown();
+               try {
+                       super.shutdown();
+               } finally {
+                       resourceManagerLeaderService.shutdown();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 237727f..474faa8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -55,7 +55,7 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
 
        private final HashMap<JobID, EmbeddedLeaderService> 
jobManagerLeaderServices;
 
-       private final RunningJobsRegistry runningJobsRegistry;
+       private final NonHaRegistry runningJobsRegistry;
 
        private boolean shutdown;
 
@@ -167,8 +167,13 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
 
                @Override
                public Thread newThread(@Nonnull Runnable r) {
-                       Thread thread = new Thread(r, "Flink HA services thread 
#" + enumerator.incrementAndGet());
+                       Thread thread = new Thread(r, "Flink HA Services Thread 
#" + enumerator.incrementAndGet());
+
+                       // HA threads should have a very high priority, but not
+                       // keep the JVM running by themselves
+                       thread.setPriority(Thread.MAX_PRIORITY);
                        thread.setDaemon(true);
+
                        return thread;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 84ac551..9fad9be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -225,7 +225,7 @@ public class EmbeddedLeaderService {
                                // check if the confirmation is for the same 
grant, or whether it is a stale grant 
                                if (service == currentLeaderProposed && 
currentLeaderSessionId.equals(leaderSessionId)) {
                                        final String address = 
service.contender.getAddress();
-                                       LOG.info("Received confirmation of 
leadership for leader {} / session={}", address, leaderSessionId);
+                                       LOG.info("Received confirmation of 
leadership for leader {} , session={}", address, leaderSessionId);
 
                                        // mark leadership
                                        currentLeaderConfirmed = service;
@@ -271,6 +271,9 @@ public class EmbeddedLeaderService {
                                currentLeaderSessionId = leaderSessionId;
                                currentLeaderProposed = leaderService;
 
+                               LOG.info("Proposing leadership to contender {} 
@ {}",
+                                               leaderService.contender, 
leaderService.contender.getAddress());
+
                                notificationExecutor.execute(
                                                new 
GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/44fc46db/flink-streaming-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/log4j-test.properties 
b/flink-streaming-java/src/test/resources/log4j-test.properties
index 881dc06..e7cd3e0 100644
--- a/flink-streaming-java/src/test/resources/log4j-test.properties
+++ b/flink-streaming-java/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.A1.layout.ConversionPattern=%-5r [%-38t] %-5p %-60c %x - %m%n

Reply via email to