Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/879#discussion_r149349474
  
    --- Diff: 
policy/src/main/java/org/apache/brooklyn/policy/ha/ElectPrimaryEffector.java ---
    @@ -0,0 +1,440 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.brooklyn.policy.ha;
    +
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.Callable;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.Entity;
    +import org.apache.brooklyn.api.entity.EntityInitializer;
    +import org.apache.brooklyn.api.entity.Group;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.effector.EffectorBody;
    +import 
org.apache.brooklyn.core.effector.EffectorTasks.EffectorBodyTaskFactory;
    +import org.apache.brooklyn.core.effector.Effectors;
    +import org.apache.brooklyn.core.entity.Attributes;
    +import org.apache.brooklyn.core.entity.Entities;
    +import org.apache.brooklyn.core.entity.EntityInternal;
    +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
    +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
    +import 
org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.collections.MutableList;
    +import org.apache.brooklyn.util.collections.MutableMap;
    +import org.apache.brooklyn.util.core.config.ConfigBag;
    +import org.apache.brooklyn.util.core.task.DynamicTasks;
    +import org.apache.brooklyn.util.core.task.Tasks;
    +import org.apache.brooklyn.util.exceptions.Exceptions;
    +import org.apache.brooklyn.util.exceptions.UserFacingException;
    +import org.apache.brooklyn.util.time.Duration;
    +import org.apache.brooklyn.util.time.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Stopwatch;
    +import com.google.common.collect.Iterables;
    +
    +/**
    +This effector will scan candidates among children or members to determine 
which should be noted as "primary".  
    +The primary is selected from service-up candidates based on a numeric 
weight as a sensor or config on the candidates 
    +(`ha.primary.weight`, unless overridden), with higher weights being 
preferred.  
    +In the case of ties, or a new candidate emerging with a weight higher than 
a current healthy primary, 
    +behaviour can be configured with `primary.selection.mode`.
    +
    +Returns a map containing a message, newPrimary, oldPrimary, and a {@link 
ResultCode} code.
    +*/
    +public class ElectPrimaryEffector implements EntityInitializer, 
ElectPrimaryConfig {
    +
    +    private static final Logger log = 
LoggerFactory.getLogger(ElectPrimaryEffector.class);
    +    
    +    public static enum ResultCode { PRIMARY_UNCHANGED, 
NEW_PRIMARY_ELECTED, NO_PRIMARY_AVAILABLE }
    +    
    +    public static final Effector<Object> EFFECTOR = 
Effectors.effector(Object.class, "electPrimary").
    +        description("Scan to detect whether there is or should be a new 
primary").buildAbstract();
    +    
    +    private final ConfigBag paramsCreationTime;
    +    
    +    public ElectPrimaryEffector(ConfigBag params) {
    +        this.paramsCreationTime = params;
    +    }
    +    
    +    public ElectPrimaryEffector(Map<String,String> params) {
    +        this(ConfigBag.newInstance(params));
    +    }
    +
    +    
    +    // wire up the entity to call the task factory to create the task on 
invocation
    +    
    +    @Override
    +    public void apply(@SuppressWarnings("deprecation") 
org.apache.brooklyn.api.entity.EntityLocal entity) {
    +        
((EntityInternal)entity).getMutableEntityType().addEffector(makeEffector(paramsCreationTime));
    +    }
    +    
    +    public static Effector<Object> makeEffector(ConfigBag params) {
    +        return Effectors.effector(EFFECTOR).impl(new 
EffectorBodyTaskFactory<Object>(new ElectPrimaryEffectorBody(params))).build();
    +    }
    +
    +    protected static class ElectPrimaryEffectorBody extends 
EffectorBody<Object> {
    +        private final ConfigBag paramsCreationTime;
    +
    +        public ElectPrimaryEffectorBody(ConfigBag paramsCreationTime) {
    +            this.paramsCreationTime = paramsCreationTime;
    +        }
    +
    +        // these are the actual tasks we do
    +        
    +        @Override
    +        public Object call(ConfigBag paramsInvocationTime) {
    +            ConfigBag params = 
ConfigBag.newInstanceCopying(paramsCreationTime).copy(paramsInvocationTime);
    +            
    +            try {
    +                Entity newPrimary = DynamicTasks.queue("check primaries", 
new CheckPrimaries(params)).getUnchecked();
    +                
    +                Entity currentActive = getCurrentActive(params);
    +                if (newPrimary==null) {
    +//                    If no primary can be found, the effector will:
    +//                        * add a "primary-election" problem so that 
service state logic, if applicable, will know that the entity is unhealthy
    +//                        * set service up false
    +//                        * if the local entity is expected to be RUNNING, 
it will set actual state to ON_FIRE
    +//                        * if the local entity has no expectation, it 
will set actual state to STOPPED
    +//                        * demote any old primary
    +                    ServiceProblemsLogic.updateProblemsIndicator(entity(), 
"primary", "No primary could be found");
    +                    entity().sensors().set(Sensors.newSensor(Entity.class, 
params.get(PRIMARY_SENSOR_NAME)), null);
    +                    entity().sensors().set(Attributes.SERVICE_UP, false);
    +                    if (Lifecycle.RUNNING.equals( 
entity().getAttribute(Attributes.SERVICE_STATE_EXPECTED) )) {
    +                        
entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
    +                    } else {
    +                        
entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.STOPPED);
    +                    }
    +                    DynamicTasks.queue(Tasks.create("demote 
"+currentActive, new Demote(params)) ).getUnchecked();                
    +                    return MutableMap.of("code", 
ResultCode.NO_PRIMARY_AVAILABLE, "message", "No primary available", "primary", 
null);
    +                }
    +                
    +                if (newPrimary.equals(currentActive)) {
    +                    // If there is a primary and it is unchanged, the 
effector will end.
    +                    return MutableMap.of("code", 
ResultCode.PRIMARY_UNCHANGED, "message", "No change required", "primary", 
newPrimary);
    +                }
    +                
    +                log.info("Detected new primary "+newPrimary+" at 
"+entity()+" (previously had "+currentActive+")");
    +//                If a new primary is detected, the effector will:
    +//                    * set the local entity to the STARTING state
    +//                    * clear any "primary-election" problem
    +//                    * publish the new primary in a sensor called 
`primary` (or the sensor set in `primary.sensor.name`)
    +//                    * cancel any other ongoing promote calls, and if 
there is an ongoing demote call on the entity being promoted, cancel that also
    +//                    * in parallel
    +//                        * invoke `promote` (or the effector called 
`primary.promote.effector.name`) on the local entity or the entity being 
promoted
    +//                        * invoke `demote` (or the effector called 
`primary.promote.effector.name`) on the local entity or the entity being 
demoted, if an entity is being demoted
    +//                    * set service up true
    +//                    * set the local entity to the RUNNING state
    +                
    +                boolean wasRunning = 
entity().sensors().get(Attributes.SERVICE_STATE_ACTUAL) == Lifecycle.RUNNING;
    +                if (wasRunning) {
    +                    log.debug("Transititioning "+entity()+" to starting 
while promoting/demoting");
    +                    ServiceStateLogic.setExpectedState(entity(), 
Lifecycle.STARTING);
    +                }
    +                ServiceProblemsLogic.clearProblemsIndicator(entity(), 
"primary");
    +                entity().sensors().set(Sensors.newSensor(Entity.class, 
params.get(PRIMARY_SENSOR_NAME)), newPrimary);
    +                try {
    +                    // TODO cancel other promote/demote calls
    +        
    +                    promoteAndDemote(params, currentActive, newPrimary);
    +                    
    +                    log.debug("Promoted/demoted primary for "+entity()+", 
now setting service up "+(wasRunning ? "and running" : "(but not setting as 
'running' because it wasn't 'running' before)"));
    +                    entity().sensors().set(Attributes.SERVICE_UP, true);
    +                    if (wasRunning) 
ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING);
    +                    
    +                } catch (Exception e) {
    +                    Exceptions.propagateIfFatal(e);
    +                    log.debug("Error promoting/demoting primary for 
"+entity()+" (rethrowing): "+e);
    +                    ServiceProblemsLogic.updateProblemsIndicator(entity(), 
"primary", e);
    +                    
ServiceStateLogic.setExpectedStateRunningWithErrors(entity());
    +                    Exceptions.propagate(e);
    +                }
    +    
    +                return MutableMap.of("code", 
ResultCode.NEW_PRIMARY_ELECTED, "message", "New primary found", "primary", 
newPrimary);
    +                
    +            } catch (Exception e) {
    +                Exceptions.propagateIfFatal(e);
    +
    +                if (Entities.isNoLongerManaged(entity())) {
    +                    // ignore errors if shutting down
    +                    return "<no-longer-managed>";
    +                }
    +
    +                Lifecycle expected = 
ServiceStateLogic.getExpectedState(entity());
    +                if (expected==Lifecycle.RUNNING || 
expected==Lifecycle.STARTING) {
    +                    // including SelectionModeStrictFailed
    +                    log.warn("Error electing new primary at "+entity()+": 
"+Exceptions.collapseText(e));
    +                    ServiceProblemsLogic.updateProblemsIndicator(entity(), 
"primary", "Error electing primary: "+
    +                        Exceptions.collapseText(e));
    +                    
entity().sensors().set(Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
    +                    
    +                    throw Exceptions.propagateAnnotated("Error electing 
primary (when "+expected.toString().toLowerCase()+")", e);
    +                }
    +                
    +                throw Exceptions.propagateAnnotated("Error electing 
primary (when not starting/running)", e);
    +            }
    +        }
    +
    +        @SuppressWarnings({ "unchecked", "rawtypes" })
    +        protected void promoteAndDemote(ConfigBag params, Entity 
oldPrimary, Entity newPrimary) {
    +            params.configureStringKey("oldPrimary", oldPrimary);
    +            params.configureStringKey("newPrimary", newPrimary);
    +            MutableList<Task> tasks = MutableList.<Task>of();
    +            
    +            if (newPrimary!=null) tasks.append(Tasks.create("promote 
"+newPrimary, new Promote(params)));
    +            else tasks.append(Tasks.warning("No new primary; nothing to 
promote", null, false));
    +            
    +            if (oldPrimary!=null) tasks.append(Tasks.create("demote 
"+oldPrimary, new Demote(params)));
    +            else tasks.append(Tasks.warning("No old primary; nothing to 
demote", null, false));
    +            
    +            log.debug("Running "+tasks);
    +            List<?> result = 
DynamicTasks.queue(Tasks.parallel("promote/demote", (List<Task<?>>)(List) 
tasks)).getUnchecked();
    +            log.debug("Ran "+tasks+", results: "+result);
    +        }
    +        
    +        private Entity getCurrentActive(ConfigBag params) {
    +            return entity().getAttribute(Sensors.newSensor(Entity.class, 
params.get(PRIMARY_SENSOR_NAME)));
    +        }
    +
    +        protected class CheckPrimaries implements Callable<Entity> {
    --- End diff --
    
    Would be nice to be able to unit test this logic directly (rather than just 
in bigger tests that also include the promotion). It is very complicated with 
many different conditions and varying behaviour (e.g. the `delayForBest` is 
tested in `ElectPrimaryTest`).
    
    It's hard to understand, particularly the subtleties of the different 
behaviours. I had to run it in the debugger to convince myself I understood it 
well enough.
    
    A couple of thoughts on that: if we split it into separate election and 
promotion effectors, maybe it would be easier to unit test (e.g. fewer 
permutations, because they can be tested separately). Could/should we refactor 
slightly to call this method directly from tests?


---

Reply via email to