DRILL-4560: When new bits register, invoke listeners in ZKClusterCoordinator
close apache/drill#626 Original JIRA title: âZKClusterCoordinator does not call DrillbitStatusListener.drillbitRegistered for new bitsâ Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4ee1d4c7 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4ee1d4c7 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4ee1d4c7 Branch: refs/heads/master Commit: 4ee1d4c7723250714e4b1f0cfbb2cf702a71a24c Parents: 2c43535 Author: Paul Rogers <[email protected]> Authored: Wed Oct 19 12:15:12 2016 -0700 Committer: Aman Sinha <[email protected]> Committed: Fri Oct 21 16:02:59 2016 -0700 ---------------------------------------------------------------------- .../drill/exec/coord/zk/ZKClusterCoordinator.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4ee1d4c7/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java index 4926f9c..51f75c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java @@ -158,6 +158,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { } } + @Override public void close() throws Exception { // discovery attempts to close its caches(ie serviceCache) already. however, being good citizens we make sure to // explicitly close serviceCache. Not only that we make sure to close serviceCache before discovery to prevent @@ -231,13 +232,17 @@ public class ZKClusterCoordinator extends ClusterCoordinator { Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints); unregisteredBits.removeAll(newDrillbitSet); + // Set of newly live bits : new set of active bits - original bits. + Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet); + registeredBits.removeAll(endpoints); + endpoints = newDrillbitSet; if (logger.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); builder.append("Active drillbit set changed. Now includes "); builder.append(newDrillbitSet.size()); - builder.append(" total bits. New active drillbits: \n"); + builder.append(" total bits. New active drillbits:\n"); for (DrillbitEndpoint bit: newDrillbitSet) { builder.append('\t'); builder.append(bit.getAddress()); @@ -252,11 +257,14 @@ public class ZKClusterCoordinator extends ClusterCoordinator { logger.debug(builder.toString()); } - // Notify the drillbit listener for newly unregistered bits. For now, we only care when drillbits are down / unregistered. - if (! (unregisteredBits.isEmpty()) ) { + // Notify listeners of newly unregistered Drillbits. + if (!unregisteredBits.isEmpty()) { drillbitUnregistered(unregisteredBits); } - + // Notify listeners of newly registered Drillbits. + if (!registeredBits.isEmpty()) { + drillbitRegistered(registeredBits); + } } catch (Exception e) { logger.error("Failure while update Drillbit service location cache.", e); }
