Github user aledsage commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/879#discussion_r149341592 --- Diff: policy/src/main/java/org/apache/brooklyn/policy/ha/ElectPrimaryEffector.java --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityInitializer; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.EffectorTasks.EffectorBodyTaskFactory; +import org.apache.brooklyn.core.effector.Effectors; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.UserFacingException; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; + +/** +This effector will scan candidates among children or members to determine which should be noted as "primary". +The primary is selected from service-up candidates based on a numeric weight as a sensor or config on the candidates +(`ha.primary.weight`, unless overridden), with higher weights being preferred. +In the case of ties, or a new candidate emerging with a weight higher than a current healthy primary, +behaviour can be configured with `primary.selection.mode`. + +Returns a map containing a message, newPrimary, oldPrimary, and a {@link ResultCode} code. +*/ +public class ElectPrimaryEffector implements EntityInitializer, ElectPrimaryConfig { + + private static final Logger log = LoggerFactory.getLogger(ElectPrimaryEffector.class); + + public static enum ResultCode { PRIMARY_UNCHANGED, NEW_PRIMARY_ELECTED, NO_PRIMARY_AVAILABLE } + + public static final Effector<Object> EFFECTOR = Effectors.effector(Object.class, "electPrimary"). + description("Scan to detect whether there is or should be a new primary").buildAbstract(); + + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffector(ConfigBag params) { + this.paramsCreationTime = params; + } + + public ElectPrimaryEffector(Map<String,String> params) { + this(ConfigBag.newInstance(params)); + } + + + // wire up the entity to call the task factory to create the task on invocation + + @Override + public void apply(@SuppressWarnings("deprecation") org.apache.brooklyn.api.entity.EntityLocal entity) { + ((EntityInternal)entity).getMutableEntityType().addEffector(makeEffector(paramsCreationTime)); + } + + public static Effector<Object> makeEffector(ConfigBag params) { + return Effectors.effector(EFFECTOR).impl(new EffectorBodyTaskFactory<Object>(new ElectPrimaryEffectorBody(params))).build(); + } + + protected static class ElectPrimaryEffectorBody extends EffectorBody<Object> { + private final ConfigBag paramsCreationTime; + + public ElectPrimaryEffectorBody(ConfigBag paramsCreationTime) { + this.paramsCreationTime = paramsCreationTime; + } + + // these are the actual tasks we do + + @Override + public Object call(ConfigBag paramsInvocationTime) { + ConfigBag params = ConfigBag.newInstanceCopying(paramsCreationTime).copy(paramsInvocationTime); + + try { + Entity newPrimary = DynamicTasks.queue("check primaries", new CheckPrimaries(params)).getUnchecked(); + + Entity currentActive = getCurrentActive(params); + if (newPrimary==null) { +// If no primary can be found, the effector will: +// * add a "primary-election" problem so that service state logic, if applicable, will know that the entity is unhealthy +// * set service up false +// * if the local entity is expected to be RUNNING, it will set actual state to ON_FIRE +// * if the local entity has no expectation, it will set actual state to STOPPED +// * demote any old primary + ServiceProblemsLogic.updateProblemsIndicator(entity(), "primary", "No primary could be found"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), null); + entity().sensors().set(Attributes.SERVICE_UP, false); + if (Lifecycle.RUNNING.equals( entity().getAttribute(Attributes.SERVICE_STATE_EXPECTED) )) { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE); + } else { + entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPED); + } + DynamicTasks.queue(Tasks.create("demote "+currentActive, new Demote(params)) ).getUnchecked(); + return MutableMap.of("code", ResultCode.NO_PRIMARY_AVAILABLE, "message", "No primary available", "primary", null); + } + + if (newPrimary.equals(currentActive)) { + // If there is a primary and it is unchanged, the effector will end. + return MutableMap.of("code", ResultCode.PRIMARY_UNCHANGED, "message", "No change required", "primary", newPrimary); + } + + log.info("Detected new primary "+newPrimary+" at "+entity()+" (previously had "+currentActive+")"); +// If a new primary is detected, the effector will: +// * set the local entity to the STARTING state +// * clear any "primary-election" problem +// * publish the new primary in a sensor called `primary` (or the sensor set in `primary.sensor.name`) +// * cancel any other ongoing promote calls, and if there is an ongoing demote call on the entity being promoted, cancel that also +// * in parallel +// * invoke `promote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being promoted +// * invoke `demote` (or the effector called `primary.promote.effector.name`) on the local entity or the entity being demoted, if an entity is being demoted +// * set service up true +// * set the local entity to the RUNNING state + + boolean wasRunning = entity().sensors().get(Attributes.SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING; + if (wasRunning) { + log.debug("Transititioning "+entity()+" to starting while promoting/demoting"); + ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING); + } + ServiceProblemsLogic.clearProblemsIndicator(entity(), "primary"); + entity().sensors().set(Sensors.newSensor(Entity.class, params.get(PRIMARY_SENSOR_NAME)), newPrimary); --- End diff -- I lean towards not just re-setting the "primary" sensor like this, when we don't know if `promoteAndDemote` is going to work. e.g. the catch block will not change the sensor's value if that has failed. Perhaps we should have another sensor that shows the active transition (saying oldPrimary -> newPrimary). Then if it goes wrong, someone can look at the sensors to get an idea of what state the system is in. --- If Brooklyn server is shutdown while executing this, the thread executing `promoteAndDemote` will be interrupted. The catch block will do the `propagateIfFatal` so we'll leave the sensors in a state that looks quite like promotion has completed successfully, except the expectedState is still starting. I think that's another good argument for giving more indication of what it's doing in a "transition" sensor.
---