Repository: samza Updated Branches: refs/heads/master 11afae3fd -> 187ec5f2f
SAMZA-1718: Simplify management of Zookeeper coordination state 1. Currently coordination related state is spread across several Zookeeper classes. There are also back-and-forth flows that exist between the ZkJobCoordinator, ZkControllerImpl, ZkControllerListener and ZkLeaderElector. This PR nukes un-necessary interfaces (and their implementation classes), simplifies state management and unifies state in the ZkJobCoordinator class. 2. Clearly defined life-cycle hooks on events: - Protocol validations happen once during the lifecycle of a StreamProcessor (instead of each new session) - New subscriptions to listeners happen at each a new Zk session Author: Jagadish <[email protected]> Reviewers: Prateek M<[email protected]> Closes #525 from vjagadish/zk-simplify Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/187ec5f2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/187ec5f2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/187ec5f2 Branch: refs/heads/master Commit: 187ec5f2fc81bab63bef5de28865ea267a4400fc Parents: 11afae3 Author: Jagadish <[email protected]> Authored: Fri Jun 22 18:06:51 2018 -0700 Committer: Jagadish <[email protected]> Committed: Fri Jun 22 18:06:51 2018 -0700 ---------------------------------------------------------------------- .../samza/zk/ZkBarrierForVersionUpgrade.java | 19 +- .../java/org/apache/samza/zk/ZkController.java | 39 ----- .../org/apache/samza/zk/ZkControllerImpl.java | 163 ------------------ .../apache/samza/zk/ZkControllerListener.java | 37 ---- .../org/apache/samza/zk/ZkJobCoordinator.java | 172 ++++++++++++------- .../org/apache/samza/zk/ZkLeaderElector.java | 17 +- .../main/java/org/apache/samza/zk/ZkUtils.java | 70 +++++--- .../apache/samza/zk/TestZkJobCoordinator.java | 19 +- .../java/org/apache/samza/zk/TestZkUtils.java | 13 +- 9 files changed, 189 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index a2ed823..d0f1caf 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -168,7 +168,7 @@ public class ZkBarrierForVersionUpgrade { * Listener for changes to the list of participants. It is meant to be subscribed only by the creator of the barrier * node. It checks to see when the barrier is ready to be marked as completed. */ - class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener { + class ZkBarrierChangeHandler extends ZkUtils.GenerationAwareZkChildListener { private static final String ACTION_NAME = "ZkBarrierChangeHandler"; private final String barrierVersion; @@ -181,10 +181,7 @@ public class ZkBarrierForVersionUpgrade { } @Override - public void handleChildChange(String barrierParticipantPath, List<String> participantIds) { - if (notAValidEvent()) { - return; - } + public void doHandleChildChange(String barrierParticipantPath, List<String> participantIds) { if (participantIds == null) { LOG.info("Received notification with null participants for barrier: {}. Ignoring it.", barrierParticipantPath); return; @@ -216,7 +213,7 @@ public class ZkBarrierForVersionUpgrade { * Barrier state values are either DONE or TIMED_OUT. It only registers to receive on valid state change notification. * Once a valid state change notification is received, it will un-subscribe from further notifications. */ - class ZkBarrierReachedHandler extends ZkUtils.GenIZkDataListener { + class ZkBarrierReachedHandler extends ZkUtils.GenerationAwareZkDataListener { private final String barrierStatePath; private final String barrierVersion; @@ -227,10 +224,8 @@ public class ZkBarrierForVersionUpgrade { } @Override - public void handleDataChange(String dataPath, Object data) { + public void doHandleDataChange(String dataPath, Object data) { LOG.info(String.format("Received barrierState change notification for barrier version: %s from zkNode: %s with data: %s.", barrierVersion, dataPath, data)); - if (notAValidEvent()) - return; State barrierState = (State) data; List<State> expectedBarrierStates = ImmutableList.of(State.DONE, State.TIMED_OUT); @@ -244,10 +239,8 @@ public class ZkBarrierForVersionUpgrade { } @Override - public void handleDataDeleted(String dataPath) { - LOG.warn("barrier done node got deleted at " + dataPath); - if (notAValidEvent()) - return; + public void doHandleDataDeleted(String path) { + LOG.warn("Data deleted in path: " + path + " barrierVersion: " + barrierVersion); } } http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java deleted file mode 100644 index de2e473..0000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.samza.zk; - -/** - * Api to the functionality provided by ZK - * - * Api for JC to ZK communication - */ -public interface ZkController { - void register(); - boolean isLeader(); - void stop(); - - // Leader - /** - * Allows the {@link ZkJobCoordinator} to subscribe to changes to Zk nodes in the processors subtree - * Typically, the leader is interested in such notifications. - */ - void subscribeToProcessorChange(); -} http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java deleted file mode 100644 index 87d7177..0000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.zk; - -import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.LeaderElector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class ZkControllerImpl implements ZkController { - private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class); - - private final String processorId; - private final ZkUtils zkUtils; - private final ZkControllerListener zkControllerListener; - private final LeaderElector zkLeaderElector; - - public ZkControllerImpl(String processorId, ZkUtils zkUtils, - ZkControllerListener zkControllerListener, LeaderElector zkLeaderElector) { - this.processorId = processorId; - this.zkUtils = zkUtils; - this.zkControllerListener = zkControllerListener; - this.zkLeaderElector = zkLeaderElector; - - init(); - } - - private void init() { - ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder(); - - zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder - .getJobModelPathPrefix()}); - } - - @Override - public void register() { - // TODO - make a loop here with some number of attempts. - // possibly split into two method - becomeLeader() and becomeParticipant() - zkLeaderElector.tryBecomeLeader(); - - // make sure we are connection to a job that uses the same ZK communication protocol version. - try { - zkUtils.validateZkVersion(); - } catch (SamzaException e) { - // IMPORTANT: Mismatch of the version, means we are trying to join a job, started by processors with different version. - // If there are no processors running, this is the place to do the migration to the new - // ZK structure. - // If some processors are running, then this processor should fail with an error to tell the user to stop all - // the processors before upgrading to this new version. - // TODO migration here - // for now we just rethrow the exception - throw e; - } - - - // subscribe to JobModel version updates - zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils)); - } - - @Override - public boolean isLeader() { - return zkLeaderElector.amILeader(); - } - - @Override - public void stop() { - try { - if (isLeader()) { - zkLeaderElector.resignLeadership(); - } - } finally { - zkUtils.deleteProcessorNode(processorId); - zkUtils.close(); - } - } - - @Override - public void subscribeToProcessorChange() { - zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils)); - } - - // Only by Leader - class ProcessorChangeHandler extends ZkUtils.GenIZkChildListener { - - public ProcessorChangeHandler(ZkUtils zkUtils) { - super(zkUtils, "ProcessorChangeHandler"); - } - - /** - * Called when the children of the given path changed. - * - * @param parentPath The parent path - * @param currentChildren The children or null if the root node (parent path) was deleted. - * @throws Exception - */ - @Override - public void handleChildChange(String parentPath, List<String> currentChildren) - throws Exception { - if (notAValidEvent()) - return; - - if (currentChildren == null) { - // this may happen only in case of exception in ZK. It happens if the zkNode has been deleted. - // So the notification will pass 'null' as the list of children. Exception should be visible in the logs. - // It makes no sense to pass it further down. - LOG.error("handleChildChange on path " + parentPath + " was invoked with NULL list of children"); - return; - } - LOG.info( - "ZkControllerImpl::ProcessorChangeHandler::handleChildChange - Path: " + parentPath + - " Current Children: " + currentChildren); - zkControllerListener.onProcessorChange(currentChildren); - - } - } - - class ZkJobModelVersionChangeHandler extends ZkUtils.GenIZkDataListener { - - public ZkJobModelVersionChangeHandler(ZkUtils zkUtils) { - super(zkUtils, "ZkJobModelVersionChangeHandler"); - } - /** - * Called when there is a change to the data in JobModel version path - * To the subscribers, it signifies that a new version of JobModel is available. - */ - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - if (notAValidEvent()) - return; - - LOG.info("pid=" + processorId + ". Got notification on version update change. path=" + dataPath + "; data=" - + data); - zkControllerListener.onNewJobModelAvailable((String) data); - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - if (notAValidEvent()) - return; - - throw new SamzaException("version update path has been deleted!"); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java deleted file mode 100644 index af4d56c..0000000 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.zk; - -import java.util.List; - -/** - * Interface to listen for notifications from the {@link ZkController} - */ -public interface ZkControllerListener { - /** - * ZkController observes the ZkTree for changes to group membership of processors and notifies the listener - * - * @param processorIds List of current znodes that are in the processing group - */ - void onProcessorChange(List<String> processorIds); - - void onNewJobModelAvailable(String version); // start job model update (stop current work) - void onNewJobModelConfirmed(String version); // start new work according to the new model -} http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 23fb3b0..6d85c66 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -19,6 +19,7 @@ package org.apache.samza.zk; import com.google.common.annotations.VisibleForTesting; + import java.util.ArrayList; import java.util.HashSet; import java.util.HashMap; @@ -62,7 +63,7 @@ import org.slf4j.LoggerFactory; /** * JobCoordinator for stand alone processor managed via Zookeeper. */ -public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { +public class ZkJobCoordinator implements JobCoordinator { private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class); // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197 @@ -84,7 +85,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private final ZkUtils zkUtils; private final String processorId; - private final ZkController zkController; private final Config config; private final ZkBarrierForVersionUpgrade barrier; @@ -117,7 +117,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener()); leaderElector = new ZkLeaderElector(processorId, zkUtils); leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); - this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector); this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs(); this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); debounceTimer = new ScheduleAfterDebounceTime(processorId); @@ -132,9 +131,15 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { @Override public void start() { + ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder(); + zkUtils.validateZkVersion(); + zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder + .getJobModelPathPrefix()}); + startMetrics(); systemAdmins.start(); - zkController.register(); + leaderElector.tryBecomeLeader(); + zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils)); } @Override @@ -157,8 +162,16 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { debounceTimer.stopScheduler(); - LOG.debug("Shutting down ZkController."); - zkController.stop(); + if (leaderElector.amILeader()) { + LOG.info("Resigning leadership for processorId: " + processorId); + leaderElector.resignLeadership(); + } + + LOG.info("Shutting down ZkUtils."); + // close zk connection + if (zkUtils != null) { + zkUtils.close(); + } LOG.debug("Shutting down system admins."); systemAdmins.stop(); @@ -212,11 +225,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { return processorId; } - //////////////////////////////////////////////// LEADER stuff /////////////////////////// - @Override + /* + * The leader handles notifications for two types of events: + * 1. Changes to the current set of processors in the group. + * 2. Changes to the set of participants who have subscribed the the barrier + */ public void onProcessorChange(List<String> processors) { if (leaderElector.amILeader()) { - LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size()); + LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size()); debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors)); } } @@ -267,42 +283,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { debounceTimer.scheduleAfterDebounceTime(ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE)); } - @Override - public void onNewJobModelAvailable(final String version) { - debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> - { - LOG.info("pid=" + processorId + ": new JobModel available"); - // get the new job model from ZK - newJobModel = zkUtils.getJobModel(version); - LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel); - - if (!newJobModel.getContainers().containsKey(processorId)) { - LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}", - processorId, newJobModel); - stop(); - } else { - // stop current work - if (coordinatorListener != null) { - coordinatorListener.onJobModelExpired(); - } - // update ZK and wait for all the processors to get this new version - barrier.join(version, processorId); - } - }); - } - - @Override - public void onNewJobModelConfirmed(String version) { - LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed"); - // get the new Model - JobModel jobModel = getJobModel(); - - // start the container with the new model - if (coordinatorListener != null) { - coordinatorListener.onNewJobModel(processorId, jobModel); - } - } - private String createProcessorId(Config config) { // TODO: This check to be removed after 0.13+ ApplicationConfig appConfig = new ApplicationConfig(config); @@ -319,17 +299,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { } } - private List<String> getActualProcessorIds(List<String> processors) { - if (processors.size() > 0) { - // we should use this list - // but it needs to be converted into PIDs, which is part of the data - return zkUtils.getActiveProcessorsIDs(processors); - } else { - // get the current list of processors - return zkUtils.getSortedActiveProcessorsIDs(); - } - } - /** * Generate new JobModel when becoming a leader or the list of processor changed. */ @@ -354,11 +323,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { class LeaderElectorListenerImpl implements LeaderElectorListener { @Override public void onBecomingLeader() { - LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!"); + LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader"); metrics.isLeader.set(true); - zkController.subscribeToProcessorChange(); - debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> - { + zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils)); + debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> { // actual actions to do are the same as onProcessorChange doOnProcessorChange(new ArrayList<>()); }); @@ -386,7 +354,16 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { metrics.barrierStateChange.inc(); metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime); if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) { - debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version)); + debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> { + LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed"); + + // read the new Model + JobModel jobModel = getJobModel(); + // start the container with the new model + if (coordinatorListener != null) { + coordinatorListener.onNewJobModel(processorId, jobModel); + } + }); } else { if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) { // no-op for non-leaders @@ -394,8 +371,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.warn("Barrier for version " + version + " timed out."); if (leaderElector.amILeader()) { LOG.info("Leader will schedule a new job model generation"); - debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> - { + debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> { // actual actions to do are the same as onProcessorChange doOnProcessorChange(new ArrayList<>()); }); @@ -412,6 +388,73 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { } } + class ProcessorChangeHandler extends ZkUtils.GenerationAwareZkChildListener { + + public ProcessorChangeHandler(ZkUtils zkUtils) { + super(zkUtils, "ProcessorChangeHandler"); + } + + /** + * Called when the children of the given path changed. + * + * @param parentPath The parent path + * @param currentChildren The children or null if the root node (parent path) was deleted. + * @throws Exception + */ + @Override + public void doHandleChildChange(String parentPath, List<String> currentChildren) + throws Exception { + if (currentChildren == null) { + LOG.info("handleChildChange on path " + parentPath + " was invoked with NULL list of children"); + } else { + LOG.info("ProcessorChangeHandler::handleChildChange - Path: {} Current Children: {} ", parentPath, currentChildren); + onProcessorChange(currentChildren); + } + } + } + + class ZkJobModelVersionChangeHandler extends ZkUtils.GenerationAwareZkDataListener { + + public ZkJobModelVersionChangeHandler(ZkUtils zkUtils) { + super(zkUtils, "ZkJobModelVersionChangeHandler"); + } + + /** + * Invoked when there is a change to the JobModelVersion z-node. It signifies that a new JobModel version is available. + */ + @Override + public void doHandleDataChange(String dataPath, Object data) { + debounceTimer.scheduleAfterDebounceTime(JOB_MODEL_VERSION_CHANGE, 0, () -> { + String jobModelVersion = (String) data; + + LOG.info("Got a notification for new JobModel version. Path = {} Version = {}", dataPath, data); + + newJobModel = zkUtils.getJobModel(jobModelVersion); + LOG.info("pid=" + processorId + ": new JobModel is available. Version =" + jobModelVersion + "; JobModel = " + newJobModel); + + if (!newJobModel.getContainers().containsKey(processorId)) { + LOG.info("New JobModel does not contain pid={}. Stopping this processor. New JobModel: {}", + processorId, newJobModel); + stop(); + } else { + // stop current work + if (coordinatorListener != null) { + coordinatorListener.onJobModelExpired(); + } + // update ZK and wait for all the processors to get this new version + barrier.join(jobModelVersion, processorId); + } + }); + } + + @Override + public void doHandleDataDeleted(String dataPath) { + LOG.warn("JobModel version z-node has been deleted. Shutting down the coordinator" + dataPath); + debounceTimer.scheduleAfterDebounceTime("JOB_MODEL_VERSION_DELETED", 0, () -> stop()); + } + } + + /// listener to handle ZK state change events @VisibleForTesting class ZkSessionStateChangedListener implements IZkStateListener { @@ -479,7 +522,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.info("Got new session created event for processor=" + processorId); debounceTimer.cancelAllScheduledActions(); LOG.info("register zk controller for the new session"); - zkController.register(); + leaderElector.tryBecomeLeader(); + zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(zkUtils)); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index c9ee1f0..9171d9d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -170,26 +170,19 @@ public class ZkLeaderElector implements LeaderElector { return String.format("[Processor-%s] %s", processorIdStr, logMessage); } - // Only by non-leaders - class PreviousProcessorChangeListener extends ZkUtils.GenIZkDataListener { + class PreviousProcessorChangeListener extends ZkUtils.GenerationAwareZkDataListener { public PreviousProcessorChangeListener(ZkUtils zkUtils) { super(zkUtils, "PreviousProcessorChangeListener"); } @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - LOG.debug("Data change on path: " + dataPath + " Data: " + data); - if (notAValidEvent()) - return; + public void doHandleDataChange(String dataPath, Object data) { + LOG.info("Data change on path: {} for data: {}", dataPath, data); } @Override - public void handleDataDeleted(String dataPath) - throws Exception { - LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again...")); - if (notAValidEvent()) { - return; - } + public void doHandleDataDeleted(String dataPath) { + LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader.")); tryBecomeLeader(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index fead167..4d325c5 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -341,60 +341,86 @@ public class ZkUtils { } /** - * Generation enforcing zk listener abstract class. - * It helps listeners, which extend it, to notAValidEvent old generation events. - * We cannot use 'sessionId' for this because it is not available through ZkClient (at leaste without reflection) + * A generation-aware {@link IZkChildListener} that only responds to events that occur in the current-generation. + * Each generation is identified by a generation-id which is scoped to the currently active Zk session and + * is incremented each time a session expires. This ensures that events corresponding to the previous generation + * are not acted on. */ - public abstract static class GenIZkChildListener implements IZkChildListener { + public abstract static class GenerationAwareZkChildListener implements IZkChildListener { private final int generation; private final ZkUtils zkUtils; private final String listenerName; - public GenIZkChildListener(ZkUtils zkUtils, String listenerName) { + public GenerationAwareZkChildListener(ZkUtils zkUtils, String listenerName) { generation = zkUtils.getGeneration(); this.zkUtils = zkUtils; this.listenerName = listenerName; } - protected boolean notAValidEvent() { - int curGeneration = zkUtils.getGeneration(); - if (curGeneration != generation) { - LOG.warn("SKIPPING handleDataChanged for " + listenerName + - " from wrong generation. current generation=" + curGeneration + "; callback generation= " + generation); - return true; + @Override + public void handleChildChange(String barrierParticipantPath, List<String> participantIds) throws Exception { + int currentGeneration = zkUtils.getGeneration(); + if (currentGeneration != generation) { + LOG.warn(String.format("Skipping handleChildChange for %s from wrong generation. Current generation: %s; " + + "Callback generation: %s", listenerName, currentGeneration, generation)); + return; } - return false; + doHandleChildChange(barrierParticipantPath, participantIds); } + + public abstract void doHandleChildChange(String path, List<String> children) throws Exception; } - public abstract static class GenIZkDataListener implements IZkDataListener { + /** + * A generation-aware {@link IZkDataListener} that only responds to events that occur in the current-generation. + * Each generation is identified by a generation-id which is scoped to the currently active Zk session and + * is incremented each time a session expires. This ensures that events corresponding to the previous generation + * are not acted on. + */ + public abstract static class GenerationAwareZkDataListener implements IZkDataListener { private final int generation; private final ZkUtils zkUtils; private final String listenerName; - public GenIZkDataListener(ZkUtils zkUtils, String listenerName) { + public GenerationAwareZkDataListener(ZkUtils zkUtils, String listenerName) { generation = zkUtils.getGeneration(); this.zkUtils = zkUtils; this.listenerName = listenerName; } - protected boolean notAValidEvent() { - int curGeneration = zkUtils.getGeneration(); - if (curGeneration != generation) { - LOG.warn("SKIPPING handleDataChanged for " + listenerName + - " from wrong generation. curGen=" + curGeneration + "; cb gen= " + generation); - return true; + @Override + public void handleDataChange(String path, Object data) { + if (!isValid()) { + LOG.warn(String.format("Skipping handleDataChange for %s from wrong generation. Current generation: %s; " + + "Callback generation: %s", listenerName, zkUtils.getGeneration(), generation)); + } else { + doHandleDataChange(path, data); + } + } + + public void handleDataDeleted(String dataPath) throws Exception { + if (!isValid()) { + LOG.warn(String.format("Skipping handleDataDeleted for %s from wrong generation. Current generation: %s; " + + "Callback generation: %s", listenerName, zkUtils.getGeneration(), generation)); + } else { + doHandleDataDeleted(dataPath); } - return false; } + public abstract void doHandleDataChange(String path, Object data); + + public abstract void doHandleDataDeleted(String path); + + private boolean isValid() { + return generation == zkUtils.getGeneration(); + } } /** * subscribe for changes of JobModel version * @param dataListener describe this */ - public void subscribeToJobModelVersionChange(GenIZkDataListener dataListener) { + public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener dataListener) { LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath()); zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener); metrics.subscriptions.inc(); http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java index c8367fb..50d6a42 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java @@ -19,6 +19,9 @@ package org.apache.samza.zk; import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.I0Itec.zkclient.ZkClient; import org.apache.samza.config.MapConfig; import org.apache.samza.job.model.JobModel; @@ -27,7 +30,10 @@ import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener; import org.apache.zookeeper.Watcher; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import static junit.framework.Assert.assertTrue; import static org.mockito.Mockito.*; @@ -36,9 +42,10 @@ public class TestZkJobCoordinator { private static final String TEST_JOB_MODEL_VERSION = "1"; @Test - public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() { + public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() throws Exception { ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class); ZkClient mockZkClient = Mockito.mock(ZkClient.class); + CountDownLatch jcShutdownLatch = new CountDownLatch(1); when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT); ZkUtils zkUtils = Mockito.mock(ZkUtils.class); @@ -47,9 +54,17 @@ public class TestZkJobCoordinator { when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>())); ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils)); - zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION); + doAnswer(new Answer<Void>() { + public Void answer(InvocationOnMock invocation) { + jcShutdownLatch.countDown(); + return null; + } + }).when(zkJobCoordinator).stop(); + final ZkJobCoordinator.ZkJobModelVersionChangeHandler zkJobModelVersionChangeHandler = zkJobCoordinator.new ZkJobModelVersionChangeHandler(zkUtils); + zkJobModelVersionChangeHandler.doHandleDataChange("path", TEST_JOB_MODEL_VERSION); verify(zkJobCoordinator, Mockito.atMost(1)).stop(); + assertTrue("Timed out waiting for JobCoordinator to stop", jcShutdownLatch.await(1, TimeUnit.MINUTES)); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/187ec5f2/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index e49dc13..19a05a6 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -145,23 +145,21 @@ public class TestZkUtils { public void testZKProtocolVersion() { // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION ZkLeaderElector le = new ZkLeaderElector("1", zkUtils); - ZkControllerImpl zkController = new ZkControllerImpl("1", zkUtils, null, le); - zkController.register(); + zkUtils.validateZkVersion(); + String root = zkUtils.getKeyBuilder().getRootPath(); String ver = (String) zkUtils.getZkClient().readData(root); Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver); // do it again (in case original value was null - zkController = new ZkControllerImpl("1", zkUtils, null, le); - zkController.register(); + zkUtils.validateZkVersion(); ver = (String) zkUtils.getZkClient().readData(root); Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver); // now negative case zkUtils.getZkClient().writeData(root, "2.0"); try { - zkController = new ZkControllerImpl("1", zkUtils, null, le); - zkController.register(); + zkUtils.validateZkVersion(); Assert.fail("Expected to fail because of version mismatch 2.0 vs 1.0"); } catch (SamzaException e) { // expected @@ -178,8 +176,7 @@ public class TestZkUtils { } try { - zkController = new ZkControllerImpl("1", zkUtils, null, le); - zkController.register(); + zkUtils.validateZkVersion(); Assert.fail("Expected to fail because of version mismatch 2.0 vs 3.0"); } catch (SamzaException e) { // expected
