Github user aledsage commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/879#discussion_r149349474 --- Diff: policy/src/main/java/org/apache/brooklyn/policy/ha/ElectPrimaryEffector.java --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityInitializer; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.EffectorTasks.EffectorBodyTaskFactory; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.UserFacingException; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; + +/** +This effector will scan candidates among children or members to determine which should be noted as "primary". +The primary is selected from service-up candidates based on a numeric weight as a sensor or config on the candidates +(`ha.primary.weight`, unless overridden), with higher weights being preferred. +In the case of ties, or a new candidate emerging with a weight higher than a current healthy primary, +behaviour can be configured with `primary.selection.mode`. + +Returns a map containing a message, newPrimary, oldPrimary, and a {@link ResultCode} code. +*/ +public class ElectPrimaryEffector implements EntityInitializer, ElectPrimaryConfig { + + private static final Logger log = LoggerFactory.getLogger(ElectPrimaryEffector.class); + + public static enum ResultCode { PRIMARY_UNCHANGED, NEW_PRIMARY_ELECTED, NO_PRIMARY_AVAILABLE } + + public static final Effector<Object> EFFECTOR = Effectors.effector(Object.class, "electPrimary"). + description("Scan to detect whether there is or should be a new primary").buildAbstract(); + + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffector(ConfigBag params) { + this.paramsCreationTime = params; + } + + public ElectPrimaryEffector(Map<String,String> params) { + this(ConfigBag.newInstance(params)); + } + + + // wire up the entity to call the task factory to create the task on invocation + + @Override + public void apply(@SuppressWarnings("deprecation") org.apache.brooklyn.api.entity.EntityLocal entity) { + ((EntityInternal)entity).getMutableEntityType().addEffector(makeEffector(paramsCreationTime)); + } + + public static Effector<Object> makeEffector(ConfigBag params) { + return Effectors.effector(EFFECTOR).impl(new EffectorBodyTaskFactory<Object>(new ElectPrimaryEffectorBody(params))).build(); + } + + protected static class ElectPrimaryEffectorBody extends EffectorBody<Object> { + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffectorBody(ConfigBag paramsCreationTime) { + this.paramsCreationTime = paramsCreationTime; + } + + // these are the actual tasks we do + + @Override + public Object call(ConfigBag paramsInvocationTime) { + ConfigBag params = ConfigBag.newInstanceCopying(paramsCreationTime).copy(paramsInvocationTime); + + try { + Entity newPrimary = DynamicTasks.queue("check primaries", new CheckPrimaries(params)).getUnchecked(); + + Entity currentActive = getCurrentActive(params); + if (newPrimary==null) { +// If no primary can be found, the effector will: +// * add a "primary-election" problem so that service state logic, if applicable, will know that the entity is unhealthy +// * set service up false +// * if the local entity is expected to be RUNNING, it will set actual state to ON_FIRE +// * if the local entity has no expectation, it will set actual state to STOPPED +// * demote any old primary + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "No primary could be found"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), null); + entity().sensors().set(Attributes.SERVICE_UP, false); + if (Lifecycle.RUNNING.equals( entity().getAttribute(Attributes.SERVICE_STATE_EXPECTED) )) { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } else { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPED); + } + DynamicTasks.queue(Tasks.create("demote "+currentActive, new Demote(params)) ).getUnchecked(); + return MutableMap.of("code", ResultCode.NO_PRIMARY_AVAILABLE, "message", "No primary available", "primary", null); + } + + if (newPrimary.equals(currentActive)) { + // If there is a primary and it is unchanged, the effector will end. + return MutableMap.of("code", ResultCode.PRIMARY_UNCHANGED, "message", "No change required", "primary", newPrimary); + } + + log.info("Detected new primary "+newPrimary+" at "+entity()+" (previously had "+currentActive+")"); +// If a new primary is detected, the effector will: +// * set the local entity to the STARTING state +// * clear any "primary-election" problem +// * publish the new primary in a sensor called `primary` (or the sensor set in `primary.sensor.name`) +// * cancel any other ongoing promote calls, and if there is an ongoing demote call on the entity being promoted, cancel that also +// * in parallel +// * invoke `promote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being promoted +// * invoke `demote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being demoted, if an entity is being demoted +// * set service up true +// * set the local entity to the RUNNING state + + boolean wasRunning = entity().sensors().get(Attributes.SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING; + if (wasRunning) { + log.debug("Transititioning "+entity()+" to starting while promoting/demoting"); + ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING); + } + ServiceProblemsLogic.clearProblemsIndicator(entity(), "primary"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), newPrimary); + try { + // TODO cancel other promote/demote calls + + promoteAndDemote(params, currentActive, newPrimary); + + log.debug("Promoted/demoted primary for "+entity()+", now setting service up "+(wasRunning ? "and running" : "(but not setting as 'running' because it wasn't 'running' before)")); + entity().sensors().set(Attributes.SERVICE_UP, true); + if (wasRunning) ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + log.debug("Error promoting/demoting primary for "+entity()+" (rethrowing): "+e); + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", e); + ServiceStateLogic.setExpectedStateRunningWithErrors(entity()); + Exceptions.propagate(e); + } + + return MutableMap.of("code", ResultCode.NEW_PRIMARY_ELECTED, "message", "New primary found", "primary", newPrimary); + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + + if (Entities.isNoLongerManaged(entity())) { + // ignore errors if shutting down + return "<no-longer-managed>"; + } + + Lifecycle expected = ServiceStateLogic.getExpectedState(entity()); + if (expected==Lifecycle.RUNNING || expected==Lifecycle.STARTING) { + // including SelectionModeStrictFailed + log.warn("Error electing new primary at "+entity()+": "+Exceptions.collapseText(e)); + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "Error electing primary: "+ + Exceptions.collapseText(e)); + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + + throw Exceptions.propagateAnnotated("Error electing primary (when "+expected.toString().toLowerCase()+")", e); + } + + throw Exceptions.propagateAnnotated("Error electing primary (when not starting/running)", e); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected void promoteAndDemote(ConfigBag params, Entity oldPrimary, Entity newPrimary) { + params.configureStringKey("oldPrimary", oldPrimary); + params.configureStringKey("newPrimary", newPrimary); + MutableList<Task> tasks = MutableList.<Task>of(); + + if (newPrimary!=null) tasks.append(Tasks.create("promote "+newPrimary, new Promote(params))); + else tasks.append(Tasks.warning("No new primary; nothing to promote", null, false)); + + if (oldPrimary!=null) tasks.append(Tasks.create("demote "+oldPrimary, new Demote(params))); + else tasks.append(Tasks.warning("No old primary; nothing to demote", null, false)); + + log.debug("Running "+tasks); + List<?> result = DynamicTasks.queue(Tasks.parallel("promote/demote", (List<Task<?>>)(List) tasks)).getUnchecked(); + log.debug("Ran "+tasks+", results: "+result); + } + + private Entity getCurrentActive(ConfigBag params) { + return entity().getAttribute(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME))); + } + + protected class CheckPrimaries implements Callable<Entity> { --- End diff -- Would be nice to be able to unit test this logic directly (rather than just in bigger tests that also include the promotion). It is very complicated with many different conditions and varying behaviour (e.g. the `delayForBest` is tested in `ElectPrimaryTest`). It's hard to understand, particularly the subtleties of the different behaviours. I had to run it in the debugger to convince myself I understood it well enough. A couple of thoughts on that: if we split it into separate election and promotion effectors, maybe it would be easier to unit test (e.g. fewer permutations, because they can be tested separately). Could/should we refactor slightly to call this method directly from tests?
---