DRILL-4560: When new bits register, invoke listeners in ZKClusterCoordinator

close apache/drill#626

Original JIRA title: “ZKClusterCoordinator does not call
DrillbitStatusListener.drillbitRegistered for new bits”


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4ee1d4c7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4ee1d4c7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4ee1d4c7

Branch: refs/heads/master
Commit: 4ee1d4c7723250714e4b1f0cfbb2cf702a71a24c
Parents: 2c43535
Author: Paul Rogers <[email protected]>
Authored: Wed Oct 19 12:15:12 2016 -0700
Committer: Aman Sinha <[email protected]>
Committed: Fri Oct 21 16:02:59 2016 -0700

----------------------------------------------------------------------
 .../drill/exec/coord/zk/ZKClusterCoordinator.java   | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4ee1d4c7/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index 4926f9c..51f75c5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -158,6 +158,7 @@ public class ZKClusterCoordinator extends 
ClusterCoordinator {
     }
   }
 
+  @Override
   public void close() throws Exception {
     // discovery attempts to close its caches(ie serviceCache) already. 
however, being good citizens we make sure to
     // explicitly close serviceCache. Not only that we make sure to close 
serviceCache before discovery to prevent
@@ -231,13 +232,17 @@ public class ZKClusterCoordinator extends 
ClusterCoordinator {
       Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
       unregisteredBits.removeAll(newDrillbitSet);
 
+      // Set of newly live bits : new set of active bits - original bits.
+      Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
+      registeredBits.removeAll(endpoints);
+
       endpoints = newDrillbitSet;
 
       if (logger.isDebugEnabled()) {
         StringBuilder builder = new StringBuilder();
         builder.append("Active drillbit set changed.  Now includes ");
         builder.append(newDrillbitSet.size());
-        builder.append(" total bits.  New active drillbits: \n");
+        builder.append(" total bits. New active drillbits:\n");
         for (DrillbitEndpoint bit: newDrillbitSet) {
           builder.append('\t');
           builder.append(bit.getAddress());
@@ -252,11 +257,14 @@ public class ZKClusterCoordinator extends 
ClusterCoordinator {
         logger.debug(builder.toString());
       }
 
-      // Notify the drillbit listener for newly unregistered bits. For now, we 
only care when drillbits are down / unregistered.
-      if (! (unregisteredBits.isEmpty()) ) {
+      // Notify listeners of newly unregistered Drillbits.
+      if (!unregisteredBits.isEmpty()) {
         drillbitUnregistered(unregisteredBits);
       }
-
+      // Notify listeners of newly registered Drillbits.
+      if (!registeredBits.isEmpty()) {
+        drillbitRegistered(registeredBits);
+      }
     } catch (Exception e) {
       logger.error("Failure while update Drillbit service location cache.", e);
     }

Reply via email to