http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerImpl.java new file mode 100644 index 0000000..8a02fcf --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerImpl.java @@ -0,0 +1,1105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.entity.Application; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.rebind.RebindManager; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.ha.HighAvailabilityManager; +import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.management.ha.ManagementNodeState; +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; +import org.apache.brooklyn.api.management.ha.MementoCopyMode; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister.Delta; +import org.apache.brooklyn.core.management.ha.BasicMasterChooser.AlphabeticMasterChooser; +import org.apache.brooklyn.core.management.internal.BrooklynObjectManagementMode; +import org.apache.brooklyn.core.management.internal.LocalEntityManager; +import org.apache.brooklyn.core.management.internal.LocationManagerInternal; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.apache.brooklyn.core.management.internal.ManagementTransitionMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.BrooklynVersion; +import brooklyn.catalog.internal.BasicBrooklynCatalog; +import brooklyn.catalog.internal.CatalogDto; +import brooklyn.config.BrooklynServerConfig; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.rebind.RebindManagerImpl; +import brooklyn.entity.rebind.persister.BrooklynPersistenceUtils; +import brooklyn.entity.rebind.persister.BrooklynPersistenceUtils.CreateBackupMode; +import brooklyn.entity.rebind.persister.PersistenceActivityMetrics; +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; +import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl; +import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl.Builder; +import brooklyn.internal.BrooklynFeatureEnablement; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.ReferenceWithError; +import brooklyn.util.task.ScheduledTask; +import brooklyn.util.task.Tasks; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.google.common.collect.Iterables; + +/** + * This is the guts of the high-availability solution in Brooklyn. + * <p> + * Multiple brooklyn nodes can be started to form a single management plane, where one node is + * designated master and the others are "warm standbys". On termination or failure of the master, + * the standbys deterministically decide which standby should become master (see {@link MasterChooser}). + * That standby promotes itself. + * <p> + * The management nodes communicate their health/status via the {@link ManagementPlaneSyncRecordPersister}. + * For example, if using {@link ManagementPlaneSyncRecordPersisterToObjectStore} with a shared blobstore or + * filesystem/NFS mount, then each management-node periodically writes its state. + * This acts as a heartbeat, being read by the other management-nodes. + * <p> + * Promotion to master involves: + * <ol> + * <li>notifying the other management-nodes that it is now master + * <li>calling {@link RebindManager#rebind(ClassLoader, org.apache.brooklyn.api.entity.rebind.RebindExceptionHandler, ManagementNodeState)} to read all persisted entity state, and thus reconstitute the entities. + * </ol> + * <p> + * Future improvements in this area will include brooklyn-managing-brooklyn to decide + promote + * the standby. + * + * @since 0.7.0 + * + * @author aled + */ +@Beta +public class HighAvailabilityManagerImpl implements HighAvailabilityManager { + + public final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "brooklyn.ha.pollPeriod", + "How often nodes should poll to detect whether master is healthy", Duration.seconds(1)); + public final ConfigKey<Duration> HEARTBEAT_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "brooklyn.ha.heartbeatTimeout", + "Maximum allowable time for detection of a peer's heartbeat; if no sign of master after this time, " + + "another node may promote itself", Duration.THIRTY_SECONDS); + + @VisibleForTesting /* only used in tests currently */ + public static interface PromotionListener { + public void promotingToMaster(); + } + + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityManagerImpl.class); + + private final ManagementContextInternal managementContext; + private volatile String ownNodeId; + private volatile ManagementPlaneSyncRecordPersister persister; + private volatile PromotionListener promotionListener; + private volatile MasterChooser masterChooser = new AlphabeticMasterChooser(); + private volatile Ticker localTickerUtc = new Ticker() { + // strictly not a ticker because returns millis UTC, but it works fine even so + @Override + public long read() { + return System.currentTimeMillis(); + } + }; + private volatile Ticker optionalRemoteTickerUtc = null; + + private volatile Task<?> pollingTask; + private volatile boolean disabled; + private volatile boolean running; + private volatile ManagementNodeState nodeState = ManagementNodeState.INITIALIZING; + private volatile boolean nodeStateTransitionComplete = false; + private volatile long priority = 0; + + private final static int MAX_NODE_STATE_HISTORY = 200; + private final List<Map<String,Object>> nodeStateHistory = MutableList.of(); + + private volatile transient Duration pollPeriodLocalOverride; + private volatile transient Duration heartbeatTimeoutOverride; + + private volatile ManagementPlaneSyncRecord lastSyncRecord; + + private volatile PersistenceActivityMetrics managementStateWritePersistenceMetrics = new PersistenceActivityMetrics(); + private volatile PersistenceActivityMetrics managementStateReadPersistenceMetrics = new PersistenceActivityMetrics(); + private final long startTimeUtc; + + public HighAvailabilityManagerImpl(ManagementContextInternal managementContext) { + this.managementContext = managementContext; + startTimeUtc = localTickerUtc.read(); + } + + @Override + public HighAvailabilityManagerImpl setPersister(ManagementPlaneSyncRecordPersister persister) { + this.persister = checkNotNull(persister, "persister"); + return this; + } + + @Override + public ManagementPlaneSyncRecordPersister getPersister() { + return persister; + } + + protected synchronized Duration getPollPeriod() { + if (pollPeriodLocalOverride!=null) return pollPeriodLocalOverride; + return managementContext.getBrooklynProperties().getConfig(POLL_PERIOD); + } + + /** Overrides {@link #POLL_PERIOD} from brooklyn config, + * including e.g. {@link Duration#PRACTICALLY_FOREVER} to disable polling; + * or <code>null</code> to clear a local override */ + public HighAvailabilityManagerImpl setPollPeriod(Duration val) { + this.pollPeriodLocalOverride = val; + if (running) { + registerPollTask(); + } + return this; + } + + public HighAvailabilityManagerImpl setMasterChooser(MasterChooser val) { + this.masterChooser = checkNotNull(val, "masterChooser"); + return this; + } + + public synchronized Duration getHeartbeatTimeout() { + if (heartbeatTimeoutOverride!=null) return heartbeatTimeoutOverride; + return managementContext.getBrooklynProperties().getConfig(HEARTBEAT_TIMEOUT); + } + + /** Overrides {@link #HEARTBEAT_TIMEOUT} from brooklyn config, + * including e.g. {@link Duration#PRACTICALLY_FOREVER} to prevent failover due to heartbeat absence; + * or <code>null</code> to clear a local override */ + public HighAvailabilityManagerImpl setHeartbeatTimeout(Duration val) { + this.heartbeatTimeoutOverride = val; + return this; + } + + /** A ticker that reads in milliseconds, for populating local timestamps. + * Defaults to System.currentTimeMillis(); may be overridden e.g. for testing. */ + public HighAvailabilityManagerImpl setLocalTicker(Ticker val) { + this.localTickerUtc = checkNotNull(val); + return this; + } + + /** A ticker that reads in milliseconds, for overriding remote timestamps. + * Defaults to null which means to use the remote timestamp. + * Only for testing as this records the remote timestamp in the object. + * <p> + * If this is supplied, one must also set {@link ManagementPlaneSyncRecordPersisterToObjectStore#useRemoteTimestampInMemento()}. */ + @VisibleForTesting + public HighAvailabilityManagerImpl setRemoteTicker(Ticker val) { + this.optionalRemoteTickerUtc = val; + return this; + } + + public HighAvailabilityManagerImpl setPromotionListener(PromotionListener val) { + this.promotionListener = checkNotNull(val, "promotionListener"); + return this; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public void disabled() { + disabled = true; + ownNodeId = managementContext.getManagementNodeId(); + // this is notionally the master, just not running; see javadoc for more info + stop(ManagementNodeState.MASTER); + + } + + @Override + public void start(HighAvailabilityMode startMode) { + nodeStateTransitionComplete = true; + disabled = false; + running = true; + changeMode(startMode, true, true); + } + + @Override + public void changeMode(HighAvailabilityMode startMode) { + changeMode(startMode, false, false); + } + + @VisibleForTesting + @Beta + public void changeMode(HighAvailabilityMode startMode, boolean preventElectionOnExplicitStandbyMode, boolean failOnExplicitModesIfUnusual) { + if (!running) { + // if was not running then start as disabled mode, then proceed as normal + LOG.info("HA changing mode to "+startMode+" from "+getInternalNodeState()+" when not running, forcing an intermediate start as DISABLED then will convert to "+startMode); + start(HighAvailabilityMode.DISABLED); + } + if (getNodeState()==ManagementNodeState.FAILED || getNodeState()==ManagementNodeState.INITIALIZING) { + if (startMode!=HighAvailabilityMode.DISABLED) { + // if coming from FAILED (or INITIALIZING because we skipped start call) then treat as initializing + setInternalNodeState(ManagementNodeState.INITIALIZING); + } + } + + ownNodeId = managementContext.getManagementNodeId(); + // TODO Small race in that we first check, and then we'll do checkMaster() on first poll, + // so another node could have already become master or terminated in that window. + ManagementNodeSyncRecord existingMaster = hasHealthyMaster(); + boolean weAreRecognisedAsMaster = existingMaster!=null && ownNodeId.equals(existingMaster.getNodeId()); + boolean weAreMasterLocally = getInternalNodeState()==ManagementNodeState.MASTER; + + // catch error in some tests where mgmt context has a different mgmt context + if (managementContext.getHighAvailabilityManager()!=this) + throw new IllegalStateException("Cannot start an HA manager on a management context with a different HA manager!"); + + if (weAreMasterLocally) { + // demotion may be required; do this before triggering an election + switch (startMode) { + case MASTER: + case AUTO: + case DISABLED: + // no action needed, will do anything necessary below (or above) + break; + case HOT_STANDBY: + case HOT_BACKUP: + case STANDBY: + demoteTo(ManagementNodeState.of(startMode).get()); break; + default: + throw new IllegalStateException("Unexpected high availability mode "+startMode+" requested for "+this); + } + } + + ManagementNodeState oldState = getInternalNodeState(); + + // now do election + switch (startMode) { + case AUTO: + // don't care; let's start and see if we promote ourselves + if (getInternalNodeState()==ManagementNodeState.INITIALIZING) { + setInternalNodeState(ManagementNodeState.STANDBY); + } + publishAndCheck(true); + switch (getInternalNodeState()) { + case HOT_BACKUP: + if (!nodeStateTransitionComplete) throw new IllegalStateException("Cannot switch to AUTO when in the middle of a transition to "+getInternalNodeState()); + // else change us to standby, desiring to go to hot standby, and continue to below + setInternalNodeState(ManagementNodeState.STANDBY); + startMode = HighAvailabilityMode.HOT_BACKUP; + case HOT_STANDBY: + case STANDBY: + if (getInternalNodeState()==ManagementNodeState.STANDBY && oldState==ManagementNodeState.INITIALIZING && startMode!=HighAvailabilityMode.HOT_BACKUP + && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY)) { + // auto requested; not promoted; so it should become hot standby + startMode = HighAvailabilityMode.HOT_STANDBY; + } + ManagementPlaneSyncRecord newState = loadManagementPlaneSyncRecord(true); + String masterNodeId = newState.getMasterNodeId(); + ManagementNodeSyncRecord masterNodeDetails = newState.getManagementNodes().get(masterNodeId); + LOG.info("Management node "+ownNodeId+" running as HA " + getInternalNodeState() + " autodetected" + + (startMode == HighAvailabilityMode.HOT_STANDBY || startMode == HighAvailabilityMode.HOT_BACKUP ? + " (will change to "+startMode+")" : "") + + ", " + + (Strings.isBlank(masterNodeId) ? "no master currently (other node should promote itself soon)" : "master " + + (existingMaster==null ? "(new) " : "") + + "is "+masterNodeId + + (masterNodeDetails==null || masterNodeDetails.getUri()==null ? " (no url)" : " at "+masterNodeDetails.getUri()))); + break; + case MASTER: + LOG.info("Management node "+ownNodeId+" running as HA MASTER autodetected"); + break; + default: + throw new IllegalStateException("Management node "+ownNodeId+" set to HA AUTO, encountered unexpected mode "+getInternalNodeState()); + } + break; + case MASTER: + if (!failOnExplicitModesIfUnusual || existingMaster==null) { + promoteToMaster(); + if (existingMaster!=null) { + LOG.info("Management node "+ownNodeId+" running as HA MASTER explicitly"); + } else { + LOG.info("Management node "+ownNodeId+" running as HA MASTER explicitly, stealing from "+existingMaster); + } + } else if (!weAreRecognisedAsMaster) { + throw new IllegalStateException("Master already exists; cannot run as master (master "+existingMaster.toVerboseString()+"); " + + "to trigger a promotion, set a priority and demote the current master"); + } else { + LOG.info("Management node "+ownNodeId+" already running as HA MASTER, when set explicitly"); + } + break; + case HOT_BACKUP: + setInternalNodeState(ManagementNodeState.HOT_BACKUP); + // then continue into next block + case STANDBY: + case HOT_STANDBY: + if (startMode!=HighAvailabilityMode.HOT_BACKUP) { + if (ManagementNodeState.isHotProxy(getInternalNodeState()) && startMode==HighAvailabilityMode.HOT_STANDBY) { + // if was hot_backup, we can immediately go hot_standby + setInternalNodeState(ManagementNodeState.HOT_STANDBY); + } else { + // from any other state, set standby, then perhaps switch to hot_standby later on (or might become master in the next block) + setInternalNodeState(ManagementNodeState.STANDBY); + } + } + if (ManagementNodeState.isStandby(getInternalNodeState())) { + if (!preventElectionOnExplicitStandbyMode) { + publishAndCheck(true); + } + if (failOnExplicitModesIfUnusual && existingMaster==null) { + LOG.error("Management node "+ownNodeId+" detected no master when "+startMode+" requested and existing master required; failing."); + throw new IllegalStateException("No existing master; cannot start as "+startMode); + } + } + String message = "Management node "+ownNodeId+" running as HA "+getNodeState()+" ("; + if (getNodeState().toString().equals(startMode.toString())) + message += "explicitly requested"; + else if (startMode==HighAvailabilityMode.HOT_STANDBY && getNodeState()==ManagementNodeState.STANDBY) + message += "caller requested "+startMode+", will attempt rebind for HOT_STANDBY next"; + else + message += "caller requested "+startMode; + + if (getNodeState()==ManagementNodeState.MASTER) { + message += " but election re-promoted this node)"; + } else { + ManagementPlaneSyncRecord newState = loadManagementPlaneSyncRecord(true); + if (Strings.isBlank(newState.getMasterNodeId())) { + message += "); no master currently"; + if (startMode != HighAvailabilityMode.HOT_BACKUP) message += " (subsequent election may repair)"; + } else { + message += "); master "+newState.getMasterNodeId(); + } + } + LOG.info(message); + break; + case DISABLED: + // safe just to run even if we weren't master + LOG.info("Management node "+ownNodeId+" HA DISABLED (was "+getInternalNodeState()+")"); + demoteTo(ManagementNodeState.FAILED); + if (pollingTask!=null) pollingTask.cancel(true); + break; + default: + throw new IllegalStateException("Unexpected high availability mode "+startMode+" requested for "+this); + } + + if ((startMode==HighAvailabilityMode.HOT_STANDBY || startMode==HighAvailabilityMode.HOT_BACKUP)) { + if (!ManagementNodeState.isHotProxy(oldState)) { + // now transition to hot proxy + nodeStateTransitionComplete = false; + if (startMode==HighAvailabilityMode.HOT_STANDBY) { + // if it should be hot standby, then we may need to promote + // inform the world that we are transitioning (but not eligible for promotion while going in to hot standby) + // (no harm in doing this twice) + publishHealth(); + } + try { + activateHotProxy(ManagementNodeState.of(startMode).get()).get(); + // error above now throws + nodeStateTransitionComplete = true; + publishHealth(); + + if (getNodeState()==ManagementNodeState.HOT_STANDBY || getNodeState()==ManagementNodeState.HOT_BACKUP) { + LOG.info("Management node "+ownNodeId+" now running as HA "+getNodeState()+"; " + + managementContext.getApplications().size()+" application"+Strings.s(managementContext.getApplications().size())+" loaded"); + } else { + // shouldn't come here, we should have gotten an error above + LOG.warn("Management node "+ownNodeId+" unable to promote to "+startMode+" (currently "+getNodeState()+"); " + + "(see log for further details)"); + } + } catch (Exception e) { + LOG.warn("Management node "+ownNodeId+" unable to promote to "+startMode+" (currently "+getNodeState()+"); rethrowing: "+Exceptions.collapseText(e)); + nodeStateTransitionComplete = true; + throw Exceptions.propagate(e); + } + } else { + // transitioning among hot proxy states - tell the rebind manager + managementContext.getRebindManager().stopReadOnly(); + managementContext.getRebindManager().startReadOnly(ManagementNodeState.of(startMode).get()); + nodeStateTransitionComplete = true; + } + } else { + nodeStateTransitionComplete = true; + } + if (startMode!=HighAvailabilityMode.DISABLED) + registerPollTask(); + } + + @Override + public void setPriority(long priority) { + this.priority = priority; + if (persister!=null) publishHealth(); + } + + @Override + public long getPriority() { + return priority; + } + + @Override + public void stop() { + LOG.debug("Stopping "+this); + stop(ManagementNodeState.TERMINATED); + } + + private void stop(ManagementNodeState newState) { + boolean wasRunning = running; + + running = false; + setInternalNodeState(newState); + if (pollingTask != null) pollingTask.cancel(true); + + if (wasRunning) { + try { + publishHealth(); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + LOG.error("Problem publishing manager-node health on termination (continuing)", e); + } + } + } + + /** returns the node state this node is trying to be in */ + public ManagementNodeState getTransitionTargetNodeState() { + return getInternalNodeState(); + } + + protected ManagementNodeState getInternalNodeState() { + return nodeState; + } + + protected void setInternalNodeState(ManagementNodeState newState) { + ManagementNodeState oldState = getInternalNodeState(); + synchronized (nodeStateHistory) { + if (this.nodeState != newState) { + nodeStateHistory.add(0, MutableMap.<String,Object>of("state", newState, "timestamp", currentTimeMillis())); + while (nodeStateHistory.size()>MAX_NODE_STATE_HISTORY) { + nodeStateHistory.remove(nodeStateHistory.size()-1); + } + } + ((RebindManagerImpl)managementContext.getRebindManager()).setAwaitingInitialRebind(running && + (ManagementNodeState.isHotProxy(newState) || newState==ManagementNodeState.MASTER)); + this.nodeState = newState; + } + + if (ManagementNodeState.isHotProxy(oldState) && !ManagementNodeState.isHotProxy(newState)) { + // could perhaps promote standby items on some transitions; but for now we stop the old read-only and re-load them + // TODO ideally there'd be an incremental rebind as well as an incremental persist + managementContext.getRebindManager().stopReadOnly(); + clearManagedItems(ManagementTransitionMode.transitioning(BrooklynObjectManagementMode.LOADED_READ_ONLY, BrooklynObjectManagementMode.UNMANAGED_PERSISTED)); + } + } + + @Override + public ManagementNodeState getNodeState() { + ManagementNodeState myNodeState = getInternalNodeState(); + if (myNodeState==ManagementNodeState.FAILED) return getInternalNodeState(); + // if target is master then we claim already being master, to prevent other nodes from taking it + // (we may fail subsequently of course) + if (myNodeState==ManagementNodeState.MASTER) return myNodeState; + + if (!nodeStateTransitionComplete) return ManagementNodeState.INITIALIZING; + return myNodeState; + } + + public ManagementPlaneSyncRecord getLastManagementPlaneSyncRecord() { + return lastSyncRecord; + } + + @SuppressWarnings("unchecked") + protected void registerPollTask() { + final Runnable job = new Runnable() { + @Override public void run() { + try { + publishAndCheck(false); + } catch (Exception e) { + if (running) { + LOG.error("Problem in HA-poller: "+e, e); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Problem in HA-poller, but no longer running: "+e, e); + } + } catch (Throwable t) { + LOG.error("Problem in HA-poller: "+t, t); + throw Exceptions.propagate(t); + } + } + }; + Callable<Task<?>> taskFactory = new Callable<Task<?>>() { + @Override public Task<?> call() { + return Tasks.builder().dynamic(false).body(job).name("HA poller task").tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) + .description("polls HA status to see whether this node should promote").build(); + } + }; + + LOG.debug("Registering poll task for "+this+", period "+getPollPeriod()); + if (getPollPeriod().equals(Duration.PRACTICALLY_FOREVER)) { + // don't schedule - used for tests + // (scheduling fires off one initial task in the background before the delay, + // which affects tests that want to know exactly when publishing happens; + // TODO would be nice if scheduled task had a "no initial submission" flag ) + } else { + if (pollingTask!=null) pollingTask.cancel(true); + + ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", "scheduled:[HA poller task]"), taskFactory); + pollingTask = managementContext.getExecutionManager().submit(task); + } + } + + /** invoked manually when initializing, and periodically thereafter */ + @VisibleForTesting + public synchronized void publishAndCheck(boolean initializing) { + publishHealth(); + checkMaster(initializing); + } + + protected synchronized void publishHealth() { + if (persister == null) { + LOG.info("Cannot publish management-node health as no persister"); + return; + } + + Stopwatch timer = Stopwatch.createStarted(); + try { + ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false); + Delta delta = ManagementPlaneSyncRecordDeltaImpl.builder().node(memento).build(); + persister.delta(delta); + managementStateWritePersistenceMetrics.noteSuccess(Duration.of(timer)); + if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento); + } catch (Throwable t) { + managementStateWritePersistenceMetrics.noteFailure(Duration.of(timer)); + managementStateWritePersistenceMetrics.noteError(t.toString()); + LOG.debug("Error publishing management-node health (rethrowing): "+t); + throw Exceptions.propagate(t); + } + } + + public void publishClearNonMaster() { + ManagementPlaneSyncRecord plane = getLastManagementPlaneSyncRecord(); + if (plane==null || persister==null) { + LOG.warn("Cannot clear HA node records; HA not active (or not yet loaded)"); + return; + } + org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordDeltaImpl.Builder db = ManagementPlaneSyncRecordDeltaImpl.builder(); + for (Map.Entry<String,ManagementNodeSyncRecord> node: plane.getManagementNodes().entrySet()) { + // only keep a node if it both claims master and is recognised as master; + // else ex-masters who died are kept around! + if (!ManagementNodeState.MASTER.equals(node.getValue().getStatus()) || + !Objects.equal(plane.getMasterNodeId(), node.getValue().getNodeId())) { + db.removedNodeId(node.getKey()); + } + } + persister.delta(db.build()); + // then get, so model is updated + loadManagementPlaneSyncRecord(true); + } + + protected synchronized void publishDemotion(boolean demotingFromMaster) { + checkState(getNodeState() != ManagementNodeState.MASTER, "node status must not be master when demoting", getNodeState()); + + if (persister == null) { + LOG.info("Cannot publish management-node health as no persister"); + return; + } + + ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false); + ManagementPlaneSyncRecordDeltaImpl.Builder deltaBuilder = ManagementPlaneSyncRecordDeltaImpl.builder() + .node(memento); + if (demotingFromMaster) { + deltaBuilder.clearMaster(ownNodeId); + } + + Delta delta = deltaBuilder.build(); + persister.delta(delta); + if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento); + } + + /** + * Publishes (via {@link #persister}) the state of this management node with itself set to master. + */ + protected synchronized void publishPromotionToMaster() { + checkState(getNodeState() == ManagementNodeState.MASTER, "node status must be master on publish, but is %s", getNodeState()); + + if (persister == null) { + LOG.info("Cannot publish management-node health as no persister"); + return; + } + + ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false); + Delta delta = ManagementPlaneSyncRecordDeltaImpl.builder() + .node(memento) + .setMaster(ownNodeId) + .build(); + persister.delta(delta); + if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento); + } + + protected boolean isHeartbeatOk(ManagementNodeSyncRecord masterNode, ManagementNodeSyncRecord meNode) { + if (masterNode==null) return false; + if (meNode==null) { + // we can't confirm it's healthy, but it appears so as far as we can tell + return true; + } + Long timestampMaster = masterNode.getRemoteTimestamp(); + Long timestampMe = meNode.getRemoteTimestamp(); + if (timestampMaster==null || timestampMe==null) return false; + return (timestampMe - timestampMaster) <= getHeartbeatTimeout().toMilliseconds(); + } + + protected ManagementNodeSyncRecord hasHealthyMaster() { + ManagementPlaneSyncRecord memento = loadManagementPlaneSyncRecord(false); + + String nodeId = memento.getMasterNodeId(); + ManagementNodeSyncRecord masterMemento = (nodeId == null) ? null : memento.getManagementNodes().get(nodeId); + + ManagementNodeSyncRecord ourMemento = memento.getManagementNodes().get(ownNodeId); + boolean result = masterMemento != null && masterMemento.getStatus() == ManagementNodeState.MASTER + && isHeartbeatOk(masterMemento, ourMemento); + + if (LOG.isDebugEnabled()) LOG.debug("Healthy-master check result={}; masterId={}; masterMemento={}; ourMemento={}", + new Object[] {result, nodeId, (masterMemento == null ? "<none>" : masterMemento.toVerboseString()), (ourMemento == null ? "<none>" : ourMemento.toVerboseString())}); + + return (result ? masterMemento : null); + } + + /** + * Looks up the state of all nodes in the management plane, and checks if the master is still ok. + * If it's not then determines which node should be promoted to master. If it is ourself, then promotes. + */ + protected void checkMaster(boolean initializing) { + ManagementPlaneSyncRecord memento = loadManagementPlaneSyncRecord(false); + + if (getNodeState()==ManagementNodeState.FAILED || getNodeState()==ManagementNodeState.HOT_BACKUP) { + // if failed or hot backup then we can't promote ourselves, so no point in checking who is master + return; + } + + String currMasterNodeId = memento.getMasterNodeId(); + ManagementNodeSyncRecord currMasterNodeRecord = memento.getManagementNodes().get(currMasterNodeId); + ManagementNodeSyncRecord ownNodeRecord = memento.getManagementNodes().get(ownNodeId); + + ManagementNodeSyncRecord newMasterNodeRecord = null; + boolean demotingSelfInFavourOfOtherMaster = false; + + if (currMasterNodeRecord != null && currMasterNodeRecord.getStatus() == ManagementNodeState.MASTER && isHeartbeatOk(currMasterNodeRecord, ownNodeRecord)) { + // master seems healthy + if (ownNodeId.equals(currMasterNodeId)) { + if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy (us): master={}", currMasterNodeRecord.toVerboseString()); + return; + } else { + if (ownNodeRecord!=null && ownNodeRecord.getStatus() == ManagementNodeState.MASTER) { + LOG.error("Management node "+ownNodeId+" detected master change, stolen from us, deferring to "+currMasterNodeId); + newMasterNodeRecord = currMasterNodeRecord; + demotingSelfInFavourOfOtherMaster = true; + } else { + if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy (remote): master={}", currMasterNodeRecord.toVerboseString()); + return; + } + } + } else if (ownNodeRecord == null || !isHeartbeatOk(ownNodeRecord, ownNodeRecord)) { + // our heartbeats are also out-of-date! perhaps something wrong with persistence? just log, and don't over-react! + if (ownNodeRecord == null) { + LOG.error("No management node memento for self ("+ownNodeId+"); perhaps persister unwritable? " + + "Master ("+currMasterNodeId+") reported failed but no-op as cannot tell conclusively"); + } else { + LOG.error("This management node ("+ownNodeId+") memento heartbeats out-of-date; perhaps perister unwritable? " + + "Master ("+currMasterNodeId+") reported failed but no-op as cannot tell conclusively" + + ": self="+ownNodeRecord.toVerboseString()); + } + return; + } else if (ownNodeId.equals(currMasterNodeId)) { + // we are supposed to be the master, but seem to be unhealthy! + LOG.warn("This management node ("+ownNodeId+") supposed to be master but reportedly unhealthy? " + + "no-op as expect other node to fix: self="+ownNodeRecord.toVerboseString()); + return; + } + + if (demotingSelfInFavourOfOtherMaster) { + LOG.debug("Master-change for this node only, demoting "+ownNodeRecord.toVerboseString()+" in favour of official master "+newMasterNodeRecord.toVerboseString()); + demoteTo( + BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? + ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY); + return; + } else { + LOG.debug("Detected master heartbeat timeout. Initiating a new master election. Master was " + currMasterNodeRecord); + } + + // Need to choose a new master + newMasterNodeRecord = masterChooser.choose(memento, getHeartbeatTimeout(), ownNodeId); + + String newMasterNodeId = (newMasterNodeRecord == null) ? null : newMasterNodeRecord.getNodeId(); + URI newMasterNodeUri = (newMasterNodeRecord == null) ? null : newMasterNodeRecord.getUri(); + boolean weAreNewMaster = ownNodeId.equals(newMasterNodeId); + + if (LOG.isDebugEnabled()) { + LOG.debug("Management node master-change required: newMaster={}; oldMaster={}; plane={}, self={}; heartbeatTimeout={}", + new Object[] { + (newMasterNodeRecord == null ? "<none>" : newMasterNodeRecord.toVerboseString()), + (currMasterNodeRecord == null ? currMasterNodeId+" (no memento)": currMasterNodeRecord.toVerboseString()), + memento, + ownNodeRecord.toVerboseString(), + getHeartbeatTimeout() + }); + } + String message = "Management node "+ownNodeId+" detected "; + String currMasterSummary = currMasterNodeId + "(" + (currMasterNodeRecord==null ? "<none>" : timestampString(currMasterNodeRecord.getRemoteTimestamp())) + ")"; + if (weAreNewMaster && (ownNodeRecord.getStatus() == ManagementNodeState.MASTER)) { + LOG.warn(message + "we must reassert master status, as was stolen and then failed at "+ + (currMasterNodeRecord==null ? "a node which has gone away" : currMasterSummary)); + publishPromotionToMaster(); + publishHealth(); + return; + } + + if (!initializing) { + if (weAreNewMaster) { + message += "we should be master, changing from "; + } + else if (currMasterNodeRecord==null && newMasterNodeId==null) message += "master change attempted but no candidates "; + else message += "master change, from "; + message += currMasterSummary + " to " + + (newMasterNodeId == null ? "<none>" : + (weAreNewMaster ? "us " : "") + + newMasterNodeId + " (" + timestampString(newMasterNodeRecord.getRemoteTimestamp()) + ")" + + (newMasterNodeUri!=null ? " "+newMasterNodeUri : "") ); + // always log, if you're looking at a standby node it's useful to see the new master's URL + LOG.info(message); + } + + // New master is ourself: promote + if (weAreNewMaster) { + promoteToMaster(); + } + } + + private static String timestampString(Long remoteTimestamp) { + if (remoteTimestamp==null) return null; + return remoteTimestamp+" / "+Time.makeTimeStringRounded( Duration.sinceUtc(remoteTimestamp))+" ago"; + } + + protected void promoteToMaster() { + if (!running) { + LOG.warn("Ignoring promote-to-master request, as HighAvailabilityManager is not running"); + return; + } + + if (promotionListener != null) { + try { + promotionListener.promotingToMaster(); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + LOG.warn("Problem in promption-listener (continuing)", e); + } + } + setInternalNodeState(ManagementNodeState.MASTER); + publishPromotionToMaster(); + try { + managementContext.getRebindManager().rebind(managementContext.getCatalogClassLoader(), null, getInternalNodeState()); + } catch (Exception e) { + LOG.error("Management node "+managementContext.getManagementNodeId()+" enountered problem during rebind when promoting self to master; demoting to FAILED and rethrowing: "+e); + demoteTo(ManagementNodeState.FAILED); + throw Exceptions.propagate(e); + } + managementContext.getRebindManager().start(); + } + + protected void backupOnDemotionIfNeeded() { + if (managementContext.getBrooklynProperties().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_REQUIRED_ON_DEMOTION)) { + BrooklynPersistenceUtils.createBackup(managementContext, CreateBackupMode.DEMOTION, MementoCopyMode.LOCAL); + } + } + + /** @deprecated since 0.7.0, use {@link #demoteTo(ManagementNodeState)} */ @Deprecated + protected void demoteToFailed() { + demoteTo(ManagementNodeState.FAILED); + } + /** @deprecated since 0.7.0, use {@link #demoteTo(ManagementNodeState)} */ @Deprecated + protected void demoteToStandby(boolean hot) { + demoteTo(hot ? ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY); + } + + protected void demoteTo(ManagementNodeState toState) { + if (toState!=ManagementNodeState.FAILED && !running) { + LOG.warn("Ignoring demote-from-master request, as HighAvailabilityManager is no longer running"); + return; + } + boolean wasMaster = (getInternalNodeState() == ManagementNodeState.MASTER); + if (wasMaster) backupOnDemotionIfNeeded(); + // TODO target may be RO ? + ManagementTransitionMode mode = ManagementTransitionMode.transitioning( + wasMaster ? BrooklynObjectManagementMode.MANAGED_PRIMARY : BrooklynObjectManagementMode.LOADED_READ_ONLY, + BrooklynObjectManagementMode.UNMANAGED_PERSISTED); + + nodeStateTransitionComplete = false; + + switch (toState) { + case FAILED: + case HOT_BACKUP: + case STANDBY: + setInternalNodeState(toState); break; + case HOT_STANDBY: + setInternalNodeState(ManagementNodeState.STANDBY); break; + default: + throw new IllegalStateException("Illegal target state: "+toState); + } + onDemotionStopItems(mode); + nodeStateTransitionComplete = true; + publishDemotion(wasMaster); + + if (toState==ManagementNodeState.HOT_BACKUP || toState==ManagementNodeState.HOT_STANDBY) { + nodeStateTransitionComplete = false; + try { + activateHotProxy(toState).get(); + } finally { + nodeStateTransitionComplete = true; + } + publishHealth(); + } + } + + protected void onDemotionStopItems(ManagementTransitionMode mode) { + // stop persistence and remove all apps etc + managementContext.getRebindManager().stopPersistence(); + managementContext.getRebindManager().stopReadOnly(); + clearManagedItems(mode); + + // tasks are cleared as part of unmanaging entities above + } + + /** clears all managed items from the management context; same items destroyed as in the course of a rebind cycle */ + protected void clearManagedItems(ManagementTransitionMode mode) { + // start with the root applications + for (Application app: managementContext.getApplications()) { + if (((EntityInternal)app).getManagementSupport().isDeployed()) { + ((LocalEntityManager)((EntityInternal)app).getManagementContext().getEntityManager()).unmanage(app, mode); + } + } + // for active management, call above will remove recursively at present, + // but for read-only, and if we stop recursively, go through them all + for (Entity entity: managementContext.getEntityManager().getEntities()) { + ((LocalEntityManager)managementContext.getEntityManager()).unmanage(entity, mode); + } + + // again, for locations, call unmanage on parents first + for (Location loc: managementContext.getLocationManager().getLocations()) { + if (loc.getParent()==null) + ((LocationManagerInternal)managementContext.getLocationManager()).unmanage(loc, mode); + } + for (Location loc: managementContext.getLocationManager().getLocations()) { + ((LocationManagerInternal)managementContext.getLocationManager()).unmanage(loc, mode); + } + + ((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>")); + } + + /** @deprecated since 0.7.0, use {@link #activateHotProxy(ManagementNodeState)} */ @Deprecated + protected boolean attemptHotStandby() { + return activateHotProxy(ManagementNodeState.HOT_STANDBY).getWithoutError(); + } + + /** Starts hot standby or hot backup, in foreground + * <p> + * In the case of the former, the caller is responsible for publishing health afterwards, + * but if it fails, this method will {@link #demoteTo(ManagementNodeState)} {@link ManagementNodeState#FAILED}. + * <p> + * @return whether the requested {@link ManagementNodeState} was possible; + * (if not, errors should be stored elsewhere), callers may want to rethrow */ + protected ReferenceWithError<Boolean> activateHotProxy(ManagementNodeState toState) { + try { + Preconditions.checkState(nodeStateTransitionComplete==false, "Must be in transitioning state to go into "+toState); + setInternalNodeState(toState); + managementContext.getRebindManager().startReadOnly(toState); + + return ReferenceWithError.newInstanceWithoutError(true); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + LOG.warn("Unable to change "+ownNodeId+" to "+toState+", switching to FAILED: "+e, e); + demoteTo(ManagementNodeState.FAILED); + return ReferenceWithError.newInstanceThrowingError(false, e); + } + } + + @Override + public ManagementPlaneSyncRecord loadManagementPlaneSyncRecord(boolean useLocalKnowledgeForThisNode) { + ManagementPlaneSyncRecord record = loadManagementPlaneSyncRecordInternal(useLocalKnowledgeForThisNode); + lastSyncRecord = record; + return record; + } + + private ManagementPlaneSyncRecord loadManagementPlaneSyncRecordInternal(boolean useLocalKnowledgeForThisNode) { + if (disabled) { + // if HA is disabled, then we are the only node - no persistence; just load a memento to describe this node + Builder builder = ManagementPlaneSyncRecordImpl.builder() + .node(createManagementNodeSyncRecord(true)); + if (getTransitionTargetNodeState() == ManagementNodeState.MASTER) { + builder.masterNodeId(ownNodeId); + } + return builder.build(); + } + if (persister == null) { + // e.g. web-console may be polling before we've started up + LOG.debug("High availablity manager has no persister; returning empty record"); + return ManagementPlaneSyncRecordImpl.builder().build(); + } + + int maxLoadAttempts = 5; + Exception lastException = null; + Stopwatch timer = Stopwatch.createStarted(); + + for (int i = 0; i < maxLoadAttempts; i++) { + try { + ManagementPlaneSyncRecord result = persister.loadSyncRecord(); + + if (useLocalKnowledgeForThisNode) { + // Report this node's most recent state, and detect AWOL nodes + ManagementNodeSyncRecord me = BasicManagementNodeSyncRecord.builder() + .from(result.getManagementNodes().get(ownNodeId), true) + .from(createManagementNodeSyncRecord(false), true) + .build(); + Iterable<ManagementNodeSyncRecord> allNodes = result.getManagementNodes().values(); + if (me.getRemoteTimestamp()!=null) + allNodes = Iterables.transform(allNodes, new MarkAwolNodes(me)); + Builder builder = ManagementPlaneSyncRecordImpl.builder() + .masterNodeId(result.getMasterNodeId()) + .nodes(allNodes); + builder.node(me); + if (getTransitionTargetNodeState() == ManagementNodeState.MASTER) { + builder.masterNodeId(ownNodeId); + } + result = builder.build(); + } + + if (i>0) { + managementStateReadPersistenceMetrics.noteError("Succeeded only on attempt "+(i+1)+": "+lastException); + } + managementStateReadPersistenceMetrics.noteSuccess(Duration.of(timer)); + return result; + } catch (IOException e) { + if (i < (maxLoadAttempts - 1)) { + if (LOG.isDebugEnabled()) LOG.debug("Problem loading mangement-plane memento attempt "+(i+1)+"/"+maxLoadAttempts+"; retrying", e); + } + lastException = e; + } + } + String message = "Failed to load mangement-plane memento "+maxLoadAttempts+" consecutive times"; + managementStateReadPersistenceMetrics.noteError(message+": "+lastException); + managementStateReadPersistenceMetrics.noteFailure(Duration.of(timer)); + + throw new IllegalStateException(message, lastException); + } + + protected ManagementNodeSyncRecord createManagementNodeSyncRecord(boolean useLocalTimestampAsRemoteTimestamp) { + long timestamp = currentTimeMillis(); + brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder builder = BasicManagementNodeSyncRecord.builder() + .brooklynVersion(BrooklynVersion.get()) + .nodeId(ownNodeId) + .status(getNodeState()) + .priority(getPriority()) + .localTimestamp(timestamp) + .uri(managementContext.getManagementNodeUri().orNull()); + if (useLocalTimestampAsRemoteTimestamp) + builder.remoteTimestamp(timestamp); + else if (optionalRemoteTickerUtc!=null) { + builder.remoteTimestamp(optionalRemoteTickerUtc.read()); + } + return builder.build(); + } + + /** + * Gets the current time, using the {@link #localTickerUtc}. Normally this is equivalent of {@link System#currentTimeMillis()}, + * but in test environments a custom {@link Ticker} can be injected via {@link #setLocalTicker(Ticker)} to allow testing of + * specific timing scenarios. + */ + protected long currentTimeMillis() { + return localTickerUtc.read(); + } + + /** + * Infers the health of a node - if it last reported itself as healthy (standby or master), but we haven't heard + * from it in a long time then report that node as failed; otherwise report its health as-is. + */ + private class MarkAwolNodes implements Function<ManagementNodeSyncRecord, ManagementNodeSyncRecord> { + private final ManagementNodeSyncRecord referenceNode; + private MarkAwolNodes(ManagementNodeSyncRecord referenceNode) { + this.referenceNode = referenceNode; + } + @Nullable + @Override + public ManagementNodeSyncRecord apply(@Nullable ManagementNodeSyncRecord input) { + if (input == null) return null; + if (!(input.getStatus() == ManagementNodeState.STANDBY || input.getStatus() == ManagementNodeState.HOT_STANDBY || input.getStatus() == ManagementNodeState.MASTER || input.getStatus() == ManagementNodeState.HOT_BACKUP)) return input; + if (isHeartbeatOk(input, referenceNode)) return input; + return BasicManagementNodeSyncRecord.builder() + .from(input) + .status(ManagementNodeState.FAILED) + .build(); + } + } + + @Override + public String toString() { + return super.toString()+"[node:"+ownNodeId+";running="+running+"]"; + } + + @Override + public Map<String,Object> getMetrics() { + Map<String,Object> result = MutableMap.of(); + + result.put("state", getNodeState()); + result.put("uptime", Time.makeTimeStringRounded(Duration.millis(currentTimeMillis()-startTimeUtc))); + result.put("currentTimeUtc", currentTimeMillis()); + result.put("startTimeUtc", startTimeUtc); + result.put("highAvailability", MutableMap.<String,Object>of( + "priority", getPriority(), + "pollPeriod", getPollPeriod().toMilliseconds(), + "heartbeatTimeout", getHeartbeatTimeout().toMilliseconds(), + "history", nodeStateHistory)); + + result.putAll(managementContext.getRebindManager().getMetrics()); + result.put("managementStatePersistence", + MutableMap.of("read", managementStateReadPersistenceMetrics, "write", managementStateWritePersistenceMetrics)); + + return result; + } + + @Override + public long getLastStateChange() { + if (nodeStateHistory.size() > 0) { + return (Long)nodeStateHistory.get(0).get("timestamp"); + } else { + return 0; + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordDeltaImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordDeltaImpl.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordDeltaImpl.java new file mode 100644 index 0000000..1ae4ea7 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordDeltaImpl.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.util.Collection; + +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister.Delta; + +import com.google.common.annotations.Beta; +import com.google.common.collect.Sets; + +/** + * @since 0.7.0 + * + * @author aled + */ +@Beta +public class ManagementPlaneSyncRecordDeltaImpl implements Delta { + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Collection<ManagementNodeSyncRecord> nodes = Sets.newLinkedHashSet(); + private Collection <String> removedNodeIds = Sets.newLinkedHashSet(); + private MasterChange masterChange = MasterChange.NO_CHANGE; + private String master; + private String expectedOldMaster; + + public Builder node(ManagementNodeSyncRecord node) { + nodes.add(checkNotNull(node, "node")); return this; + } + public Builder removedNodeId(String id) { + removedNodeIds.add(checkNotNull(id, "id")); return this; + } + public Builder setMaster(String nodeId) { + masterChange = MasterChange.SET_MASTER; + master = checkNotNull(nodeId, "masterId"); + return this; + } + public Builder clearMaster(String optionalExpectedNodeId) { + masterChange = MasterChange.CLEAR_MASTER; + this.expectedOldMaster = optionalExpectedNodeId; + return this; + } + public Delta build() { + return new ManagementPlaneSyncRecordDeltaImpl(this); + } + } + + private final Collection<ManagementNodeSyncRecord> nodes; + private final Collection <String> removedNodeIds; + private final MasterChange masterChange; + private String masterId; + private String expectedOldMaster; + + ManagementPlaneSyncRecordDeltaImpl(Builder builder) { + nodes = builder.nodes; + removedNodeIds = builder.removedNodeIds; + masterChange = builder.masterChange; + masterId = builder.master; + this.expectedOldMaster = builder.expectedOldMaster; + checkState((masterChange == MasterChange.SET_MASTER) ? (masterId != null) : (masterId == null), + "invalid combination: change=%s; masterId=%s", masterChange, masterId); + } + + @Override + public Collection<ManagementNodeSyncRecord> getNodes() { + return nodes; + } + + @Override + public Collection<String> getRemovedNodeIds() { + return removedNodeIds; + } + + @Override + public MasterChange getMasterChange() { + return masterChange; + } + + @Override + public String getNewMasterOrNull() { + return masterId; + } + + @Override + public String getExpectedMasterToClear() { + return expectedOldMaster; + } + + @Override + public String toString() { + return getClass().getCanonicalName()+"["+ + (masterChange!=null && masterChange != MasterChange.NO_CHANGE ? + masterChange+": "+expectedOldMaster+"->"+masterId+"; " : "")+ + "nodes: "+nodes+ + (removedNodeIds!=null && !removedNodeIds.isEmpty() ? "; removing: "+removedNodeIds : "") + +"]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java new file mode 100644 index 0000000..7f188be --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; +import org.apache.brooklyn.api.management.ha.ManagementNodeState; +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.rebind.persister.MementoSerializer; +import brooklyn.entity.rebind.persister.PersistenceObjectStore; +import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccessorWithLock; +import brooklyn.entity.rebind.persister.RetryingMementoSerializer; +import brooklyn.entity.rebind.persister.StoreObjectAccessorLocking; +import brooklyn.entity.rebind.persister.XmlMementoSerializer; +import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; +import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Maps; + +/** + * Structure of files is: + * <ul> + * <li>{@code plane/} - top-level directory + * <ul> + * <li>{@code master} - contains the id of the management-node that is currently master + * <li>{@code change.log} - log of changes made + * <li>{@code nodes/} - sub-directory, containing one file per management-node + * <ul> + * <li>{@code a9WiuVKp} - file named after the management-node's id, containing the management node's current state + * <li>{@code E1eDXQF3} + * </ul> + * </ul> + * </ul> + * + * All writes are done synchronously. + * + * @since 0.7.0 + * + * @author aled + */ +@Beta +public class ManagementPlaneSyncRecordPersisterToObjectStore implements ManagementPlaneSyncRecordPersister { + + // TODO Multiple node appending to change.log could cause strange interleaving, or perhaps even data loss? + // But this file is not critical to functionality. + + // TODO Should ManagementPlaneSyncRecordPersister.Delta be different so can tell what is a significant event, + // and thus log it in change.log - currently only subset of significant things being logged. + + private static final Logger LOG = LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterToObjectStore.class); + + private static final Duration SHUTDOWN_TIMEOUT = Duration.TEN_SECONDS; + private static final Duration SYNC_WRITE_TIMEOUT = Duration.TEN_SECONDS; + public static final String NODES_SUB_PATH = "nodes"; + + // TODO Leak if we go through lots of managers; but tiny! + private final ConcurrentMap<String, StoreObjectAccessorWithLock> nodeWriters = Maps.newConcurrentMap(); + + private StoreObjectAccessorWithLock masterWriter; + private StoreObjectAccessorWithLock changeLogWriter; + + private ManagementContext mgmt; + private final PersistenceObjectStore objectStore; + private final MementoSerializer<Object> serializer; + + private static final int MAX_SERIALIZATION_ATTEMPTS = 5; + + private boolean started = false; + private volatile boolean running = true; + + @VisibleForTesting + /** allows, when testing, to be able to override file times / blobstore times with time from the ticker */ + private boolean preferRemoteTimestampInMemento = false; + + /** + * @param mgmt not used much at present but handy to ensure we know it so that obj store is prepared + * @param objectStore the objectStore use to read/write management-plane data; + * this must have been {@link PersistenceObjectStore#prepareForSharedUse(brooklyn.entity.rebind.persister.PersistMode, HighAvailabilityMode)} + * @param classLoader ClassLoader to use when deserializing data + */ + public ManagementPlaneSyncRecordPersisterToObjectStore(ManagementContext mgmt, PersistenceObjectStore objectStore, ClassLoader classLoader) { + this.mgmt = mgmt; + this.objectStore = checkNotNull(objectStore, "objectStore"); + + MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(checkNotNull(classLoader, "classLoader")); + this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, MAX_SERIALIZATION_ATTEMPTS); + + objectStore.createSubPath(NODES_SUB_PATH); + + LOG.debug("ManagementPlaneMemento-persister will use store "+objectStore); + } + + protected synchronized void init() { + if (!started) { + started = true; + //Leading slash causes problems in SL, it's not a correct file name so remove it. + //But once removed we can't load the master file from existing persistence stores. + //Try to detect if the old file exists, if so use old-style names, otherwise use the correct names. + masterWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("/master")); + if (masterWriter.get() != null) { + changeLogWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("/change.log")); + } else { + masterWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("master")); + changeLogWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("change.log")); + } + } + } + + @VisibleForTesting + public void preferRemoteTimestampInMemento() { + preferRemoteTimestampInMemento = true; + } + + @Override + public void stop() { + running = false; + try { + for (StoreObjectAccessorWithLock writer : nodeWriters.values()) { + try { + writer.waitForCurrentWrites(SHUTDOWN_TIMEOUT); + } catch (TimeoutException e) { + LOG.warn("Timeout during shutdown, waiting for write of "+writer+"; continuing"); + } + } + try { + masterWriter.waitForCurrentWrites(SHUTDOWN_TIMEOUT); + } catch (TimeoutException e) { + LOG.warn("Timeout during shutdown, waiting for write of "+masterWriter+"; continuing"); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public ManagementPlaneSyncRecord loadSyncRecord() throws IOException { + if (!running) { + throw new IllegalStateException("Persister not running; cannot load memento from "+ objectStore.getSummaryName()); + } + init(); + + // Note this is called a lot - every time we check the heartbeats + if (LOG.isTraceEnabled()) LOG.trace("Loading management-plane memento from {}", objectStore.getSummaryName()); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + ManagementPlaneSyncRecordImpl.Builder builder = ManagementPlaneSyncRecordImpl.builder(); + + // Be careful about order: if the master-file says nodeX then nodeX's file must have an up-to-date timestamp. + // Therefore read master file first, followed by the other node-files. + String masterNodeId = masterWriter.get(); + if (masterNodeId == null) { + LOG.debug("No master-memento deserialized from file "+masterWriter+"; ignoring and continuing (normal on startup, should cause an error later in live operation)"); + } else { + builder.masterNodeId(masterNodeId); + } + + // Load node-files + List<String> nodeFiles = objectStore.listContentsWithSubPath(NODES_SUB_PATH); + LOG.trace("Loading nodes from {}; {} nodes.", + new Object[]{objectStore.getSummaryName(), nodeFiles.size()}); + + for (String nodeFile : nodeFiles) { + PersistenceObjectStore.StoreObjectAccessor objectAccessor = objectStore.newAccessor(nodeFile); + String nodeContents = null; + Exception problem = null; + try { + nodeContents = objectAccessor.get(); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + problem = e; + } + if (problem!=null || Strings.isBlank(nodeContents)) { + // happens if node has gone away, or if FileBasedObjectStore.moveFile is not atomic, + // i.e. it has deleted but not updated it yet + if (objectAccessor.exists()) { + throw Exceptions.propagate(new IllegalStateException("Node record "+nodeFile+" could not be read when "+mgmt.getManagementNodeId()+" was scanning", problem)); + } else { + LOG.warn("Node record "+nodeFile+" went away while "+mgmt.getManagementNodeId()+" was scanning, ignoring (it has probably been terminated)"); + // if file was deleted, silently ignore + continue; + } + } + ManagementNodeSyncRecord memento = (ManagementNodeSyncRecord) serializer.fromString(nodeContents); + if (memento == null) { + // shouldn't happen + throw Exceptions.propagate(new IllegalStateException("Node record "+nodeFile+" could not be deserialized when "+mgmt.getManagementNodeId()+" was scanning: "+nodeContents, problem)); + } else { + if (memento.getRemoteTimestamp()!=null && preferRemoteTimestampInMemento) { + // in test mode, the remote timestamp is stored in the file + } else { + if (memento.getRemoteTimestamp()!=null) { + LOG.debug("Ignoring remote timestamp in memento file ("+memento+"); looks like this data has been manually copied in"); + } + Date lastModifiedDate = objectAccessor.getLastModifiedDate(); + ((BasicManagementNodeSyncRecord)memento).setRemoteTimestamp(lastModifiedDate!=null ? lastModifiedDate.getTime() : null); + } + builder.node(memento); + } + } + + if (LOG.isDebugEnabled()) LOG.trace("Loaded management-plane memento; {} nodes, took {}", + nodeFiles.size(), + Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + return builder.build(); + } + + @Override + public void delta(Delta delta) { + if (!running) { + if (LOG.isDebugEnabled()) LOG.debug("Persister not running; ignoring checkpointed delta of manager-memento"); + return; + } + init(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + if (LOG.isTraceEnabled()) LOG.trace("Checkpointing delta of manager-memento; updating {}", delta); + + for (ManagementNodeSyncRecord m : delta.getNodes()) { + persist(m); + } + for (String id : delta.getRemovedNodeIds()) { + deleteNode(id); + } + switch (delta.getMasterChange()) { + case NO_CHANGE: + break; // no-op + case SET_MASTER: + persistMaster(checkNotNull(delta.getNewMasterOrNull()), null); + break; + case CLEAR_MASTER: + persistMaster("", delta.getExpectedMasterToClear()); + break; // no-op + default: + throw new IllegalStateException("Unknown state for master-change: "+delta.getMasterChange()); + } + if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of manager-memento in "+Time.makeTimeStringRounded(stopwatch)+": "+delta); + } + + private void persistMaster(String nodeId, String optionalExpectedId) { + if (optionalExpectedId!=null) { + String currentRemoteMaster = masterWriter.get(); + if (currentRemoteMaster==null) { + // okay to have nothing at remote + } else if (!currentRemoteMaster.trim().equals(optionalExpectedId.trim())) { + LOG.warn("Master at server is "+(Strings.isBlank(currentRemoteMaster) ? "<none>" : currentRemoteMaster)+"; expected "+optionalExpectedId+" " + + (Strings.isNonBlank(nodeId) ? "and would set as "+nodeId : "and would clear") + + ", so not applying (yet)"); + return; + } + } + masterWriter.put(nodeId); + try { + masterWriter.waitForCurrentWrites(SYNC_WRITE_TIMEOUT); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + changeLogWriter.append(Time.makeDateString() + ": set master to " + nodeId + "\n"); + try { + changeLogWriter.waitForCurrentWrites(SYNC_WRITE_TIMEOUT); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + @Override + @VisibleForTesting + public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException { + for (StoreObjectAccessorWithLock writer : nodeWriters.values()) { + writer.waitForCurrentWrites(timeout); + } + masterWriter.waitForCurrentWrites(timeout); + } + + public void checkpoint(ManagementPlaneSyncRecord record) { + init(); + for (ManagementNodeSyncRecord node : record.getManagementNodes().values()) { + // Check included in case the node in the memento is the one being initialised by + // BrooklynLauncher in the copy state command. + if (!ManagementNodeState.INITIALIZING.equals(node.getStatus()) && node.getNodeId() != null) { + persist(node); + } + } + } + + private void persist(ManagementNodeSyncRecord node) { + StoreObjectAccessorWithLock writer = getOrCreateNodeWriter(node.getNodeId()); + boolean fileExists = writer.exists(); + writer.put(serializer.toString(node)); + try { + writer.waitForCurrentWrites(SYNC_WRITE_TIMEOUT); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + if (!fileExists) { + changeLogWriter.append(Time.makeDateString()+": created node "+node.getNodeId()+"\n"); + } + if (node.getStatus() == ManagementNodeState.TERMINATED || node.getStatus() == ManagementNodeState.FAILED) { + changeLogWriter.append(Time.makeDateString()+": set node "+node.getNodeId()+" status to "+node.getStatus()+"\n"); + } + } + + private void deleteNode(String nodeId) { + getOrCreateNodeWriter(nodeId).delete(); + changeLogWriter.append(Time.makeDateString()+": deleted node "+nodeId+"\n"); + } + + private StoreObjectAccessorWithLock getOrCreateNodeWriter(String nodeId) { + PersistenceObjectStore.StoreObjectAccessorWithLock writer = nodeWriters.get(nodeId); + if (writer == null) { + nodeWriters.putIfAbsent(nodeId, + new StoreObjectAccessorLocking(objectStore.newAccessor(NODES_SUB_PATH+"/"+nodeId))); + writer = nodeWriters.get(nodeId); + } + return writer; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/MasterChooser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/MasterChooser.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/MasterChooser.java new file mode 100644 index 0000000..2a2bd09 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/MasterChooser.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.management.ha; + +import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; +import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; + +import brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; + +/** + * For choosing which management node to promote, when master detected as failed or stopped. + * + * @since 0.7.0 + * + * @author aled + */ +@Beta +public interface MasterChooser { + + ManagementNodeSyncRecord choose(ManagementPlaneSyncRecord memento, Duration heartbeatTimeout, String ownNodeId); + +}
