http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java deleted file mode 100644 index c4489fe..0000000 --- a/core/src/main/java/brooklyn/management/ha/HighAvailabilityManagerImpl.java +++ /dev/null @@ -1,1105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.entity.Application; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.rebind.RebindManager; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.ha.HighAvailabilityManager; -import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; -import org.apache.brooklyn.api.management.ha.ManagementNodeState; -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; -import org.apache.brooklyn.api.management.ha.MementoCopyMode; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister.Delta; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.BrooklynVersion; -import brooklyn.catalog.internal.BasicBrooklynCatalog; -import brooklyn.catalog.internal.CatalogDto; -import brooklyn.config.BrooklynServerConfig; -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.rebind.RebindManagerImpl; -import brooklyn.entity.rebind.persister.BrooklynPersistenceUtils; -import brooklyn.entity.rebind.persister.BrooklynPersistenceUtils.CreateBackupMode; -import brooklyn.entity.rebind.persister.PersistenceActivityMetrics; -import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; -import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl; -import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl.Builder; -import brooklyn.internal.BrooklynFeatureEnablement; -import brooklyn.management.ha.BasicMasterChooser.AlphabeticMasterChooser; -import brooklyn.management.internal.BrooklynObjectManagementMode; -import brooklyn.management.internal.LocalEntityManager; -import brooklyn.management.internal.LocationManagerInternal; -import brooklyn.management.internal.ManagementContextInternal; -import brooklyn.management.internal.ManagementTransitionMode; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.exceptions.ReferenceWithError; -import brooklyn.util.task.ScheduledTask; -import brooklyn.util.task.Tasks; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.base.Ticker; -import com.google.common.collect.Iterables; - -/** - * This is the guts of the high-availability solution in Brooklyn. - * <p> - * Multiple brooklyn nodes can be started to form a single management plane, where one node is - * designated master and the others are "warm standbys". On termination or failure of the master, - * the standbys deterministically decide which standby should become master (see {@link MasterChooser}). - * That standby promotes itself. - * <p> - * The management nodes communicate their health/status via the {@link ManagementPlaneSyncRecordPersister}. - * For example, if using {@link ManagementPlaneSyncRecordPersisterToObjectStore} with a shared blobstore or - * filesystem/NFS mount, then each management-node periodically writes its state. - * This acts as a heartbeat, being read by the other management-nodes. - * <p> - * Promotion to master involves: - * <ol> - * <li>notifying the other management-nodes that it is now master - * <li>calling {@link RebindManager#rebind(ClassLoader, org.apache.brooklyn.api.entity.rebind.RebindExceptionHandler, ManagementNodeState)} to read all persisted entity state, and thus reconstitute the entities. - * </ol> - * <p> - * Future improvements in this area will include brooklyn-managing-brooklyn to decide + promote - * the standby. - * - * @since 0.7.0 - * - * @author aled - */ -@Beta -public class HighAvailabilityManagerImpl implements HighAvailabilityManager { - - public final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "brooklyn.ha.pollPeriod", - "How often nodes should poll to detect whether master is healthy", Duration.seconds(1)); - public final ConfigKey<Duration> HEARTBEAT_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "brooklyn.ha.heartbeatTimeout", - "Maximum allowable time for detection of a peer's heartbeat; if no sign of master after this time, " - + "another node may promote itself", Duration.THIRTY_SECONDS); - - @VisibleForTesting /* only used in tests currently */ - public static interface PromotionListener { - public void promotingToMaster(); - } - - private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityManagerImpl.class); - - private final ManagementContextInternal managementContext; - private volatile String ownNodeId; - private volatile ManagementPlaneSyncRecordPersister persister; - private volatile PromotionListener promotionListener; - private volatile MasterChooser masterChooser = new AlphabeticMasterChooser(); - private volatile Ticker localTickerUtc = new Ticker() { - // strictly not a ticker because returns millis UTC, but it works fine even so - @Override - public long read() { - return System.currentTimeMillis(); - } - }; - private volatile Ticker optionalRemoteTickerUtc = null; - - private volatile Task<?> pollingTask; - private volatile boolean disabled; - private volatile boolean running; - private volatile ManagementNodeState nodeState = ManagementNodeState.INITIALIZING; - private volatile boolean nodeStateTransitionComplete = false; - private volatile long priority = 0; - - private final static int MAX_NODE_STATE_HISTORY = 200; - private final List<Map<String,Object>> nodeStateHistory = MutableList.of(); - - private volatile transient Duration pollPeriodLocalOverride; - private volatile transient Duration heartbeatTimeoutOverride; - - private volatile ManagementPlaneSyncRecord lastSyncRecord; - - private volatile PersistenceActivityMetrics managementStateWritePersistenceMetrics = new PersistenceActivityMetrics(); - private volatile PersistenceActivityMetrics managementStateReadPersistenceMetrics = new PersistenceActivityMetrics(); - private final long startTimeUtc; - - public HighAvailabilityManagerImpl(ManagementContextInternal managementContext) { - this.managementContext = managementContext; - startTimeUtc = localTickerUtc.read(); - } - - @Override - public HighAvailabilityManagerImpl setPersister(ManagementPlaneSyncRecordPersister persister) { - this.persister = checkNotNull(persister, "persister"); - return this; - } - - @Override - public ManagementPlaneSyncRecordPersister getPersister() { - return persister; - } - - protected synchronized Duration getPollPeriod() { - if (pollPeriodLocalOverride!=null) return pollPeriodLocalOverride; - return managementContext.getBrooklynProperties().getConfig(POLL_PERIOD); - } - - /** Overrides {@link #POLL_PERIOD} from brooklyn config, - * including e.g. {@link Duration#PRACTICALLY_FOREVER} to disable polling; - * or <code>null</code> to clear a local override */ - public HighAvailabilityManagerImpl setPollPeriod(Duration val) { - this.pollPeriodLocalOverride = val; - if (running) { - registerPollTask(); - } - return this; - } - - public HighAvailabilityManagerImpl setMasterChooser(MasterChooser val) { - this.masterChooser = checkNotNull(val, "masterChooser"); - return this; - } - - public synchronized Duration getHeartbeatTimeout() { - if (heartbeatTimeoutOverride!=null) return heartbeatTimeoutOverride; - return managementContext.getBrooklynProperties().getConfig(HEARTBEAT_TIMEOUT); - } - - /** Overrides {@link #HEARTBEAT_TIMEOUT} from brooklyn config, - * including e.g. {@link Duration#PRACTICALLY_FOREVER} to prevent failover due to heartbeat absence; - * or <code>null</code> to clear a local override */ - public HighAvailabilityManagerImpl setHeartbeatTimeout(Duration val) { - this.heartbeatTimeoutOverride = val; - return this; - } - - /** A ticker that reads in milliseconds, for populating local timestamps. - * Defaults to System.currentTimeMillis(); may be overridden e.g. for testing. */ - public HighAvailabilityManagerImpl setLocalTicker(Ticker val) { - this.localTickerUtc = checkNotNull(val); - return this; - } - - /** A ticker that reads in milliseconds, for overriding remote timestamps. - * Defaults to null which means to use the remote timestamp. - * Only for testing as this records the remote timestamp in the object. - * <p> - * If this is supplied, one must also set {@link ManagementPlaneSyncRecordPersisterToObjectStore#useRemoteTimestampInMemento()}. */ - @VisibleForTesting - public HighAvailabilityManagerImpl setRemoteTicker(Ticker val) { - this.optionalRemoteTickerUtc = val; - return this; - } - - public HighAvailabilityManagerImpl setPromotionListener(PromotionListener val) { - this.promotionListener = checkNotNull(val, "promotionListener"); - return this; - } - - @Override - public boolean isRunning() { - return running; - } - - @Override - public void disabled() { - disabled = true; - ownNodeId = managementContext.getManagementNodeId(); - // this is notionally the master, just not running; see javadoc for more info - stop(ManagementNodeState.MASTER); - - } - - @Override - public void start(HighAvailabilityMode startMode) { - nodeStateTransitionComplete = true; - disabled = false; - running = true; - changeMode(startMode, true, true); - } - - @Override - public void changeMode(HighAvailabilityMode startMode) { - changeMode(startMode, false, false); - } - - @VisibleForTesting - @Beta - public void changeMode(HighAvailabilityMode startMode, boolean preventElectionOnExplicitStandbyMode, boolean failOnExplicitModesIfUnusual) { - if (!running) { - // if was not running then start as disabled mode, then proceed as normal - LOG.info("HA changing mode to "+startMode+" from "+getInternalNodeState()+" when not running, forcing an intermediate start as DISABLED then will convert to "+startMode); - start(HighAvailabilityMode.DISABLED); - } - if (getNodeState()==ManagementNodeState.FAILED || getNodeState()==ManagementNodeState.INITIALIZING) { - if (startMode!=HighAvailabilityMode.DISABLED) { - // if coming from FAILED (or INITIALIZING because we skipped start call) then treat as initializing - setInternalNodeState(ManagementNodeState.INITIALIZING); - } - } - - ownNodeId = managementContext.getManagementNodeId(); - // TODO Small race in that we first check, and then we'll do checkMaster() on first poll, - // so another node could have already become master or terminated in that window. - ManagementNodeSyncRecord existingMaster = hasHealthyMaster(); - boolean weAreRecognisedAsMaster = existingMaster!=null && ownNodeId.equals(existingMaster.getNodeId()); - boolean weAreMasterLocally = getInternalNodeState()==ManagementNodeState.MASTER; - - // catch error in some tests where mgmt context has a different mgmt context - if (managementContext.getHighAvailabilityManager()!=this) - throw new IllegalStateException("Cannot start an HA manager on a management context with a different HA manager!"); - - if (weAreMasterLocally) { - // demotion may be required; do this before triggering an election - switch (startMode) { - case MASTER: - case AUTO: - case DISABLED: - // no action needed, will do anything necessary below (or above) - break; - case HOT_STANDBY: - case HOT_BACKUP: - case STANDBY: - demoteTo(ManagementNodeState.of(startMode).get()); break; - default: - throw new IllegalStateException("Unexpected high availability mode "+startMode+" requested for "+this); - } - } - - ManagementNodeState oldState = getInternalNodeState(); - - // now do election - switch (startMode) { - case AUTO: - // don't care; let's start and see if we promote ourselves - if (getInternalNodeState()==ManagementNodeState.INITIALIZING) { - setInternalNodeState(ManagementNodeState.STANDBY); - } - publishAndCheck(true); - switch (getInternalNodeState()) { - case HOT_BACKUP: - if (!nodeStateTransitionComplete) throw new IllegalStateException("Cannot switch to AUTO when in the middle of a transition to "+getInternalNodeState()); - // else change us to standby, desiring to go to hot standby, and continue to below - setInternalNodeState(ManagementNodeState.STANDBY); - startMode = HighAvailabilityMode.HOT_BACKUP; - case HOT_STANDBY: - case STANDBY: - if (getInternalNodeState()==ManagementNodeState.STANDBY && oldState==ManagementNodeState.INITIALIZING && startMode!=HighAvailabilityMode.HOT_BACKUP - && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY)) { - // auto requested; not promoted; so it should become hot standby - startMode = HighAvailabilityMode.HOT_STANDBY; - } - ManagementPlaneSyncRecord newState = loadManagementPlaneSyncRecord(true); - String masterNodeId = newState.getMasterNodeId(); - ManagementNodeSyncRecord masterNodeDetails = newState.getManagementNodes().get(masterNodeId); - LOG.info("Management node "+ownNodeId+" running as HA " + getInternalNodeState() + " autodetected" - + (startMode == HighAvailabilityMode.HOT_STANDBY || startMode == HighAvailabilityMode.HOT_BACKUP ? - " (will change to "+startMode+")" : "") - + ", " + - (Strings.isBlank(masterNodeId) ? "no master currently (other node should promote itself soon)" : "master " - + (existingMaster==null ? "(new) " : "") - + "is "+masterNodeId + - (masterNodeDetails==null || masterNodeDetails.getUri()==null ? " (no url)" : " at "+masterNodeDetails.getUri()))); - break; - case MASTER: - LOG.info("Management node "+ownNodeId+" running as HA MASTER autodetected"); - break; - default: - throw new IllegalStateException("Management node "+ownNodeId+" set to HA AUTO, encountered unexpected mode "+getInternalNodeState()); - } - break; - case MASTER: - if (!failOnExplicitModesIfUnusual || existingMaster==null) { - promoteToMaster(); - if (existingMaster!=null) { - LOG.info("Management node "+ownNodeId+" running as HA MASTER explicitly"); - } else { - LOG.info("Management node "+ownNodeId+" running as HA MASTER explicitly, stealing from "+existingMaster); - } - } else if (!weAreRecognisedAsMaster) { - throw new IllegalStateException("Master already exists; cannot run as master (master "+existingMaster.toVerboseString()+"); " - + "to trigger a promotion, set a priority and demote the current master"); - } else { - LOG.info("Management node "+ownNodeId+" already running as HA MASTER, when set explicitly"); - } - break; - case HOT_BACKUP: - setInternalNodeState(ManagementNodeState.HOT_BACKUP); - // then continue into next block - case STANDBY: - case HOT_STANDBY: - if (startMode!=HighAvailabilityMode.HOT_BACKUP) { - if (ManagementNodeState.isHotProxy(getInternalNodeState()) && startMode==HighAvailabilityMode.HOT_STANDBY) { - // if was hot_backup, we can immediately go hot_standby - setInternalNodeState(ManagementNodeState.HOT_STANDBY); - } else { - // from any other state, set standby, then perhaps switch to hot_standby later on (or might become master in the next block) - setInternalNodeState(ManagementNodeState.STANDBY); - } - } - if (ManagementNodeState.isStandby(getInternalNodeState())) { - if (!preventElectionOnExplicitStandbyMode) { - publishAndCheck(true); - } - if (failOnExplicitModesIfUnusual && existingMaster==null) { - LOG.error("Management node "+ownNodeId+" detected no master when "+startMode+" requested and existing master required; failing."); - throw new IllegalStateException("No existing master; cannot start as "+startMode); - } - } - String message = "Management node "+ownNodeId+" running as HA "+getNodeState()+" ("; - if (getNodeState().toString().equals(startMode.toString())) - message += "explicitly requested"; - else if (startMode==HighAvailabilityMode.HOT_STANDBY && getNodeState()==ManagementNodeState.STANDBY) - message += "caller requested "+startMode+", will attempt rebind for HOT_STANDBY next"; - else - message += "caller requested "+startMode; - - if (getNodeState()==ManagementNodeState.MASTER) { - message += " but election re-promoted this node)"; - } else { - ManagementPlaneSyncRecord newState = loadManagementPlaneSyncRecord(true); - if (Strings.isBlank(newState.getMasterNodeId())) { - message += "); no master currently"; - if (startMode != HighAvailabilityMode.HOT_BACKUP) message += " (subsequent election may repair)"; - } else { - message += "); master "+newState.getMasterNodeId(); - } - } - LOG.info(message); - break; - case DISABLED: - // safe just to run even if we weren't master - LOG.info("Management node "+ownNodeId+" HA DISABLED (was "+getInternalNodeState()+")"); - demoteTo(ManagementNodeState.FAILED); - if (pollingTask!=null) pollingTask.cancel(true); - break; - default: - throw new IllegalStateException("Unexpected high availability mode "+startMode+" requested for "+this); - } - - if ((startMode==HighAvailabilityMode.HOT_STANDBY || startMode==HighAvailabilityMode.HOT_BACKUP)) { - if (!ManagementNodeState.isHotProxy(oldState)) { - // now transition to hot proxy - nodeStateTransitionComplete = false; - if (startMode==HighAvailabilityMode.HOT_STANDBY) { - // if it should be hot standby, then we may need to promote - // inform the world that we are transitioning (but not eligible for promotion while going in to hot standby) - // (no harm in doing this twice) - publishHealth(); - } - try { - activateHotProxy(ManagementNodeState.of(startMode).get()).get(); - // error above now throws - nodeStateTransitionComplete = true; - publishHealth(); - - if (getNodeState()==ManagementNodeState.HOT_STANDBY || getNodeState()==ManagementNodeState.HOT_BACKUP) { - LOG.info("Management node "+ownNodeId+" now running as HA "+getNodeState()+"; " - + managementContext.getApplications().size()+" application"+Strings.s(managementContext.getApplications().size())+" loaded"); - } else { - // shouldn't come here, we should have gotten an error above - LOG.warn("Management node "+ownNodeId+" unable to promote to "+startMode+" (currently "+getNodeState()+"); " - + "(see log for further details)"); - } - } catch (Exception e) { - LOG.warn("Management node "+ownNodeId+" unable to promote to "+startMode+" (currently "+getNodeState()+"); rethrowing: "+Exceptions.collapseText(e)); - nodeStateTransitionComplete = true; - throw Exceptions.propagate(e); - } - } else { - // transitioning among hot proxy states - tell the rebind manager - managementContext.getRebindManager().stopReadOnly(); - managementContext.getRebindManager().startReadOnly(ManagementNodeState.of(startMode).get()); - nodeStateTransitionComplete = true; - } - } else { - nodeStateTransitionComplete = true; - } - if (startMode!=HighAvailabilityMode.DISABLED) - registerPollTask(); - } - - @Override - public void setPriority(long priority) { - this.priority = priority; - if (persister!=null) publishHealth(); - } - - @Override - public long getPriority() { - return priority; - } - - @Override - public void stop() { - LOG.debug("Stopping "+this); - stop(ManagementNodeState.TERMINATED); - } - - private void stop(ManagementNodeState newState) { - boolean wasRunning = running; - - running = false; - setInternalNodeState(newState); - if (pollingTask != null) pollingTask.cancel(true); - - if (wasRunning) { - try { - publishHealth(); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - LOG.error("Problem publishing manager-node health on termination (continuing)", e); - } - } - } - - /** returns the node state this node is trying to be in */ - public ManagementNodeState getTransitionTargetNodeState() { - return getInternalNodeState(); - } - - protected ManagementNodeState getInternalNodeState() { - return nodeState; - } - - protected void setInternalNodeState(ManagementNodeState newState) { - ManagementNodeState oldState = getInternalNodeState(); - synchronized (nodeStateHistory) { - if (this.nodeState != newState) { - nodeStateHistory.add(0, MutableMap.<String,Object>of("state", newState, "timestamp", currentTimeMillis())); - while (nodeStateHistory.size()>MAX_NODE_STATE_HISTORY) { - nodeStateHistory.remove(nodeStateHistory.size()-1); - } - } - ((RebindManagerImpl)managementContext.getRebindManager()).setAwaitingInitialRebind(running && - (ManagementNodeState.isHotProxy(newState) || newState==ManagementNodeState.MASTER)); - this.nodeState = newState; - } - - if (ManagementNodeState.isHotProxy(oldState) && !ManagementNodeState.isHotProxy(newState)) { - // could perhaps promote standby items on some transitions; but for now we stop the old read-only and re-load them - // TODO ideally there'd be an incremental rebind as well as an incremental persist - managementContext.getRebindManager().stopReadOnly(); - clearManagedItems(ManagementTransitionMode.transitioning(BrooklynObjectManagementMode.LOADED_READ_ONLY, BrooklynObjectManagementMode.UNMANAGED_PERSISTED)); - } - } - - @Override - public ManagementNodeState getNodeState() { - ManagementNodeState myNodeState = getInternalNodeState(); - if (myNodeState==ManagementNodeState.FAILED) return getInternalNodeState(); - // if target is master then we claim already being master, to prevent other nodes from taking it - // (we may fail subsequently of course) - if (myNodeState==ManagementNodeState.MASTER) return myNodeState; - - if (!nodeStateTransitionComplete) return ManagementNodeState.INITIALIZING; - return myNodeState; - } - - public ManagementPlaneSyncRecord getLastManagementPlaneSyncRecord() { - return lastSyncRecord; - } - - @SuppressWarnings("unchecked") - protected void registerPollTask() { - final Runnable job = new Runnable() { - @Override public void run() { - try { - publishAndCheck(false); - } catch (Exception e) { - if (running) { - LOG.error("Problem in HA-poller: "+e, e); - } else { - if (LOG.isDebugEnabled()) LOG.debug("Problem in HA-poller, but no longer running: "+e, e); - } - } catch (Throwable t) { - LOG.error("Problem in HA-poller: "+t, t); - throw Exceptions.propagate(t); - } - } - }; - Callable<Task<?>> taskFactory = new Callable<Task<?>>() { - @Override public Task<?> call() { - return Tasks.builder().dynamic(false).body(job).name("HA poller task").tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) - .description("polls HA status to see whether this node should promote").build(); - } - }; - - LOG.debug("Registering poll task for "+this+", period "+getPollPeriod()); - if (getPollPeriod().equals(Duration.PRACTICALLY_FOREVER)) { - // don't schedule - used for tests - // (scheduling fires off one initial task in the background before the delay, - // which affects tests that want to know exactly when publishing happens; - // TODO would be nice if scheduled task had a "no initial submission" flag ) - } else { - if (pollingTask!=null) pollingTask.cancel(true); - - ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", "scheduled:[HA poller task]"), taskFactory); - pollingTask = managementContext.getExecutionManager().submit(task); - } - } - - /** invoked manually when initializing, and periodically thereafter */ - @VisibleForTesting - public synchronized void publishAndCheck(boolean initializing) { - publishHealth(); - checkMaster(initializing); - } - - protected synchronized void publishHealth() { - if (persister == null) { - LOG.info("Cannot publish management-node health as no persister"); - return; - } - - Stopwatch timer = Stopwatch.createStarted(); - try { - ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false); - Delta delta = ManagementPlaneSyncRecordDeltaImpl.builder().node(memento).build(); - persister.delta(delta); - managementStateWritePersistenceMetrics.noteSuccess(Duration.of(timer)); - if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento); - } catch (Throwable t) { - managementStateWritePersistenceMetrics.noteFailure(Duration.of(timer)); - managementStateWritePersistenceMetrics.noteError(t.toString()); - LOG.debug("Error publishing management-node health (rethrowing): "+t); - throw Exceptions.propagate(t); - } - } - - public void publishClearNonMaster() { - ManagementPlaneSyncRecord plane = getLastManagementPlaneSyncRecord(); - if (plane==null || persister==null) { - LOG.warn("Cannot clear HA node records; HA not active (or not yet loaded)"); - return; - } - brooklyn.management.ha.ManagementPlaneSyncRecordDeltaImpl.Builder db = ManagementPlaneSyncRecordDeltaImpl.builder(); - for (Map.Entry<String,ManagementNodeSyncRecord> node: plane.getManagementNodes().entrySet()) { - // only keep a node if it both claims master and is recognised as master; - // else ex-masters who died are kept around! - if (!ManagementNodeState.MASTER.equals(node.getValue().getStatus()) || - !Objects.equal(plane.getMasterNodeId(), node.getValue().getNodeId())) { - db.removedNodeId(node.getKey()); - } - } - persister.delta(db.build()); - // then get, so model is updated - loadManagementPlaneSyncRecord(true); - } - - protected synchronized void publishDemotion(boolean demotingFromMaster) { - checkState(getNodeState() != ManagementNodeState.MASTER, "node status must not be master when demoting", getNodeState()); - - if (persister == null) { - LOG.info("Cannot publish management-node health as no persister"); - return; - } - - ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false); - ManagementPlaneSyncRecordDeltaImpl.Builder deltaBuilder = ManagementPlaneSyncRecordDeltaImpl.builder() - .node(memento); - if (demotingFromMaster) { - deltaBuilder.clearMaster(ownNodeId); - } - - Delta delta = deltaBuilder.build(); - persister.delta(delta); - if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento); - } - - /** - * Publishes (via {@link #persister}) the state of this management node with itself set to master. - */ - protected synchronized void publishPromotionToMaster() { - checkState(getNodeState() == ManagementNodeState.MASTER, "node status must be master on publish, but is %s", getNodeState()); - - if (persister == null) { - LOG.info("Cannot publish management-node health as no persister"); - return; - } - - ManagementNodeSyncRecord memento = createManagementNodeSyncRecord(false); - Delta delta = ManagementPlaneSyncRecordDeltaImpl.builder() - .node(memento) - .setMaster(ownNodeId) - .build(); - persister.delta(delta); - if (LOG.isTraceEnabled()) LOG.trace("Published management-node health: {}", memento); - } - - protected boolean isHeartbeatOk(ManagementNodeSyncRecord masterNode, ManagementNodeSyncRecord meNode) { - if (masterNode==null) return false; - if (meNode==null) { - // we can't confirm it's healthy, but it appears so as far as we can tell - return true; - } - Long timestampMaster = masterNode.getRemoteTimestamp(); - Long timestampMe = meNode.getRemoteTimestamp(); - if (timestampMaster==null || timestampMe==null) return false; - return (timestampMe - timestampMaster) <= getHeartbeatTimeout().toMilliseconds(); - } - - protected ManagementNodeSyncRecord hasHealthyMaster() { - ManagementPlaneSyncRecord memento = loadManagementPlaneSyncRecord(false); - - String nodeId = memento.getMasterNodeId(); - ManagementNodeSyncRecord masterMemento = (nodeId == null) ? null : memento.getManagementNodes().get(nodeId); - - ManagementNodeSyncRecord ourMemento = memento.getManagementNodes().get(ownNodeId); - boolean result = masterMemento != null && masterMemento.getStatus() == ManagementNodeState.MASTER - && isHeartbeatOk(masterMemento, ourMemento); - - if (LOG.isDebugEnabled()) LOG.debug("Healthy-master check result={}; masterId={}; masterMemento={}; ourMemento={}", - new Object[] {result, nodeId, (masterMemento == null ? "<none>" : masterMemento.toVerboseString()), (ourMemento == null ? "<none>" : ourMemento.toVerboseString())}); - - return (result ? masterMemento : null); - } - - /** - * Looks up the state of all nodes in the management plane, and checks if the master is still ok. - * If it's not then determines which node should be promoted to master. If it is ourself, then promotes. - */ - protected void checkMaster(boolean initializing) { - ManagementPlaneSyncRecord memento = loadManagementPlaneSyncRecord(false); - - if (getNodeState()==ManagementNodeState.FAILED || getNodeState()==ManagementNodeState.HOT_BACKUP) { - // if failed or hot backup then we can't promote ourselves, so no point in checking who is master - return; - } - - String currMasterNodeId = memento.getMasterNodeId(); - ManagementNodeSyncRecord currMasterNodeRecord = memento.getManagementNodes().get(currMasterNodeId); - ManagementNodeSyncRecord ownNodeRecord = memento.getManagementNodes().get(ownNodeId); - - ManagementNodeSyncRecord newMasterNodeRecord = null; - boolean demotingSelfInFavourOfOtherMaster = false; - - if (currMasterNodeRecord != null && currMasterNodeRecord.getStatus() == ManagementNodeState.MASTER && isHeartbeatOk(currMasterNodeRecord, ownNodeRecord)) { - // master seems healthy - if (ownNodeId.equals(currMasterNodeId)) { - if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy (us): master={}", currMasterNodeRecord.toVerboseString()); - return; - } else { - if (ownNodeRecord!=null && ownNodeRecord.getStatus() == ManagementNodeState.MASTER) { - LOG.error("Management node "+ownNodeId+" detected master change, stolen from us, deferring to "+currMasterNodeId); - newMasterNodeRecord = currMasterNodeRecord; - demotingSelfInFavourOfOtherMaster = true; - } else { - if (LOG.isTraceEnabled()) LOG.trace("Existing master healthy (remote): master={}", currMasterNodeRecord.toVerboseString()); - return; - } - } - } else if (ownNodeRecord == null || !isHeartbeatOk(ownNodeRecord, ownNodeRecord)) { - // our heartbeats are also out-of-date! perhaps something wrong with persistence? just log, and don't over-react! - if (ownNodeRecord == null) { - LOG.error("No management node memento for self ("+ownNodeId+"); perhaps persister unwritable? " - + "Master ("+currMasterNodeId+") reported failed but no-op as cannot tell conclusively"); - } else { - LOG.error("This management node ("+ownNodeId+") memento heartbeats out-of-date; perhaps perister unwritable? " - + "Master ("+currMasterNodeId+") reported failed but no-op as cannot tell conclusively" - + ": self="+ownNodeRecord.toVerboseString()); - } - return; - } else if (ownNodeId.equals(currMasterNodeId)) { - // we are supposed to be the master, but seem to be unhealthy! - LOG.warn("This management node ("+ownNodeId+") supposed to be master but reportedly unhealthy? " - + "no-op as expect other node to fix: self="+ownNodeRecord.toVerboseString()); - return; - } - - if (demotingSelfInFavourOfOtherMaster) { - LOG.debug("Master-change for this node only, demoting "+ownNodeRecord.toVerboseString()+" in favour of official master "+newMasterNodeRecord.toVerboseString()); - demoteTo( - BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? - ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY); - return; - } else { - LOG.debug("Detected master heartbeat timeout. Initiating a new master election. Master was " + currMasterNodeRecord); - } - - // Need to choose a new master - newMasterNodeRecord = masterChooser.choose(memento, getHeartbeatTimeout(), ownNodeId); - - String newMasterNodeId = (newMasterNodeRecord == null) ? null : newMasterNodeRecord.getNodeId(); - URI newMasterNodeUri = (newMasterNodeRecord == null) ? null : newMasterNodeRecord.getUri(); - boolean weAreNewMaster = ownNodeId.equals(newMasterNodeId); - - if (LOG.isDebugEnabled()) { - LOG.debug("Management node master-change required: newMaster={}; oldMaster={}; plane={}, self={}; heartbeatTimeout={}", - new Object[] { - (newMasterNodeRecord == null ? "<none>" : newMasterNodeRecord.toVerboseString()), - (currMasterNodeRecord == null ? currMasterNodeId+" (no memento)": currMasterNodeRecord.toVerboseString()), - memento, - ownNodeRecord.toVerboseString(), - getHeartbeatTimeout() - }); - } - String message = "Management node "+ownNodeId+" detected "; - String currMasterSummary = currMasterNodeId + "(" + (currMasterNodeRecord==null ? "<none>" : timestampString(currMasterNodeRecord.getRemoteTimestamp())) + ")"; - if (weAreNewMaster && (ownNodeRecord.getStatus() == ManagementNodeState.MASTER)) { - LOG.warn(message + "we must reassert master status, as was stolen and then failed at "+ - (currMasterNodeRecord==null ? "a node which has gone away" : currMasterSummary)); - publishPromotionToMaster(); - publishHealth(); - return; - } - - if (!initializing) { - if (weAreNewMaster) { - message += "we should be master, changing from "; - } - else if (currMasterNodeRecord==null && newMasterNodeId==null) message += "master change attempted but no candidates "; - else message += "master change, from "; - message += currMasterSummary + " to " - + (newMasterNodeId == null ? "<none>" : - (weAreNewMaster ? "us " : "") - + newMasterNodeId + " (" + timestampString(newMasterNodeRecord.getRemoteTimestamp()) + ")" - + (newMasterNodeUri!=null ? " "+newMasterNodeUri : "") ); - // always log, if you're looking at a standby node it's useful to see the new master's URL - LOG.info(message); - } - - // New master is ourself: promote - if (weAreNewMaster) { - promoteToMaster(); - } - } - - private static String timestampString(Long remoteTimestamp) { - if (remoteTimestamp==null) return null; - return remoteTimestamp+" / "+Time.makeTimeStringRounded( Duration.sinceUtc(remoteTimestamp))+" ago"; - } - - protected void promoteToMaster() { - if (!running) { - LOG.warn("Ignoring promote-to-master request, as HighAvailabilityManager is not running"); - return; - } - - if (promotionListener != null) { - try { - promotionListener.promotingToMaster(); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - LOG.warn("Problem in promption-listener (continuing)", e); - } - } - setInternalNodeState(ManagementNodeState.MASTER); - publishPromotionToMaster(); - try { - managementContext.getRebindManager().rebind(managementContext.getCatalogClassLoader(), null, getInternalNodeState()); - } catch (Exception e) { - LOG.error("Management node "+managementContext.getManagementNodeId()+" enountered problem during rebind when promoting self to master; demoting to FAILED and rethrowing: "+e); - demoteTo(ManagementNodeState.FAILED); - throw Exceptions.propagate(e); - } - managementContext.getRebindManager().start(); - } - - protected void backupOnDemotionIfNeeded() { - if (managementContext.getBrooklynProperties().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_REQUIRED_ON_DEMOTION)) { - BrooklynPersistenceUtils.createBackup(managementContext, CreateBackupMode.DEMOTION, MementoCopyMode.LOCAL); - } - } - - /** @deprecated since 0.7.0, use {@link #demoteTo(ManagementNodeState)} */ @Deprecated - protected void demoteToFailed() { - demoteTo(ManagementNodeState.FAILED); - } - /** @deprecated since 0.7.0, use {@link #demoteTo(ManagementNodeState)} */ @Deprecated - protected void demoteToStandby(boolean hot) { - demoteTo(hot ? ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY); - } - - protected void demoteTo(ManagementNodeState toState) { - if (toState!=ManagementNodeState.FAILED && !running) { - LOG.warn("Ignoring demote-from-master request, as HighAvailabilityManager is no longer running"); - return; - } - boolean wasMaster = (getInternalNodeState() == ManagementNodeState.MASTER); - if (wasMaster) backupOnDemotionIfNeeded(); - // TODO target may be RO ? - ManagementTransitionMode mode = ManagementTransitionMode.transitioning( - wasMaster ? BrooklynObjectManagementMode.MANAGED_PRIMARY : BrooklynObjectManagementMode.LOADED_READ_ONLY, - BrooklynObjectManagementMode.UNMANAGED_PERSISTED); - - nodeStateTransitionComplete = false; - - switch (toState) { - case FAILED: - case HOT_BACKUP: - case STANDBY: - setInternalNodeState(toState); break; - case HOT_STANDBY: - setInternalNodeState(ManagementNodeState.STANDBY); break; - default: - throw new IllegalStateException("Illegal target state: "+toState); - } - onDemotionStopItems(mode); - nodeStateTransitionComplete = true; - publishDemotion(wasMaster); - - if (toState==ManagementNodeState.HOT_BACKUP || toState==ManagementNodeState.HOT_STANDBY) { - nodeStateTransitionComplete = false; - try { - activateHotProxy(toState).get(); - } finally { - nodeStateTransitionComplete = true; - } - publishHealth(); - } - } - - protected void onDemotionStopItems(ManagementTransitionMode mode) { - // stop persistence and remove all apps etc - managementContext.getRebindManager().stopPersistence(); - managementContext.getRebindManager().stopReadOnly(); - clearManagedItems(mode); - - // tasks are cleared as part of unmanaging entities above - } - - /** clears all managed items from the management context; same items destroyed as in the course of a rebind cycle */ - protected void clearManagedItems(ManagementTransitionMode mode) { - // start with the root applications - for (Application app: managementContext.getApplications()) { - if (((EntityInternal)app).getManagementSupport().isDeployed()) { - ((LocalEntityManager)((EntityInternal)app).getManagementContext().getEntityManager()).unmanage(app, mode); - } - } - // for active management, call above will remove recursively at present, - // but for read-only, and if we stop recursively, go through them all - for (Entity entity: managementContext.getEntityManager().getEntities()) { - ((LocalEntityManager)managementContext.getEntityManager()).unmanage(entity, mode); - } - - // again, for locations, call unmanage on parents first - for (Location loc: managementContext.getLocationManager().getLocations()) { - if (loc.getParent()==null) - ((LocationManagerInternal)managementContext.getLocationManager()).unmanage(loc, mode); - } - for (Location loc: managementContext.getLocationManager().getLocations()) { - ((LocationManagerInternal)managementContext.getLocationManager()).unmanage(loc, mode); - } - - ((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>")); - } - - /** @deprecated since 0.7.0, use {@link #activateHotProxy(ManagementNodeState)} */ @Deprecated - protected boolean attemptHotStandby() { - return activateHotProxy(ManagementNodeState.HOT_STANDBY).getWithoutError(); - } - - /** Starts hot standby or hot backup, in foreground - * <p> - * In the case of the former, the caller is responsible for publishing health afterwards, - * but if it fails, this method will {@link #demoteTo(ManagementNodeState)} {@link ManagementNodeState#FAILED}. - * <p> - * @return whether the requested {@link ManagementNodeState} was possible; - * (if not, errors should be stored elsewhere), callers may want to rethrow */ - protected ReferenceWithError<Boolean> activateHotProxy(ManagementNodeState toState) { - try { - Preconditions.checkState(nodeStateTransitionComplete==false, "Must be in transitioning state to go into "+toState); - setInternalNodeState(toState); - managementContext.getRebindManager().startReadOnly(toState); - - return ReferenceWithError.newInstanceWithoutError(true); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - LOG.warn("Unable to change "+ownNodeId+" to "+toState+", switching to FAILED: "+e, e); - demoteTo(ManagementNodeState.FAILED); - return ReferenceWithError.newInstanceThrowingError(false, e); - } - } - - @Override - public ManagementPlaneSyncRecord loadManagementPlaneSyncRecord(boolean useLocalKnowledgeForThisNode) { - ManagementPlaneSyncRecord record = loadManagementPlaneSyncRecordInternal(useLocalKnowledgeForThisNode); - lastSyncRecord = record; - return record; - } - - private ManagementPlaneSyncRecord loadManagementPlaneSyncRecordInternal(boolean useLocalKnowledgeForThisNode) { - if (disabled) { - // if HA is disabled, then we are the only node - no persistence; just load a memento to describe this node - Builder builder = ManagementPlaneSyncRecordImpl.builder() - .node(createManagementNodeSyncRecord(true)); - if (getTransitionTargetNodeState() == ManagementNodeState.MASTER) { - builder.masterNodeId(ownNodeId); - } - return builder.build(); - } - if (persister == null) { - // e.g. web-console may be polling before we've started up - LOG.debug("High availablity manager has no persister; returning empty record"); - return ManagementPlaneSyncRecordImpl.builder().build(); - } - - int maxLoadAttempts = 5; - Exception lastException = null; - Stopwatch timer = Stopwatch.createStarted(); - - for (int i = 0; i < maxLoadAttempts; i++) { - try { - ManagementPlaneSyncRecord result = persister.loadSyncRecord(); - - if (useLocalKnowledgeForThisNode) { - // Report this node's most recent state, and detect AWOL nodes - ManagementNodeSyncRecord me = BasicManagementNodeSyncRecord.builder() - .from(result.getManagementNodes().get(ownNodeId), true) - .from(createManagementNodeSyncRecord(false), true) - .build(); - Iterable<ManagementNodeSyncRecord> allNodes = result.getManagementNodes().values(); - if (me.getRemoteTimestamp()!=null) - allNodes = Iterables.transform(allNodes, new MarkAwolNodes(me)); - Builder builder = ManagementPlaneSyncRecordImpl.builder() - .masterNodeId(result.getMasterNodeId()) - .nodes(allNodes); - builder.node(me); - if (getTransitionTargetNodeState() == ManagementNodeState.MASTER) { - builder.masterNodeId(ownNodeId); - } - result = builder.build(); - } - - if (i>0) { - managementStateReadPersistenceMetrics.noteError("Succeeded only on attempt "+(i+1)+": "+lastException); - } - managementStateReadPersistenceMetrics.noteSuccess(Duration.of(timer)); - return result; - } catch (IOException e) { - if (i < (maxLoadAttempts - 1)) { - if (LOG.isDebugEnabled()) LOG.debug("Problem loading mangement-plane memento attempt "+(i+1)+"/"+maxLoadAttempts+"; retrying", e); - } - lastException = e; - } - } - String message = "Failed to load mangement-plane memento "+maxLoadAttempts+" consecutive times"; - managementStateReadPersistenceMetrics.noteError(message+": "+lastException); - managementStateReadPersistenceMetrics.noteFailure(Duration.of(timer)); - - throw new IllegalStateException(message, lastException); - } - - protected ManagementNodeSyncRecord createManagementNodeSyncRecord(boolean useLocalTimestampAsRemoteTimestamp) { - long timestamp = currentTimeMillis(); - brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder builder = BasicManagementNodeSyncRecord.builder() - .brooklynVersion(BrooklynVersion.get()) - .nodeId(ownNodeId) - .status(getNodeState()) - .priority(getPriority()) - .localTimestamp(timestamp) - .uri(managementContext.getManagementNodeUri().orNull()); - if (useLocalTimestampAsRemoteTimestamp) - builder.remoteTimestamp(timestamp); - else if (optionalRemoteTickerUtc!=null) { - builder.remoteTimestamp(optionalRemoteTickerUtc.read()); - } - return builder.build(); - } - - /** - * Gets the current time, using the {@link #localTickerUtc}. Normally this is equivalent of {@link System#currentTimeMillis()}, - * but in test environments a custom {@link Ticker} can be injected via {@link #setLocalTicker(Ticker)} to allow testing of - * specific timing scenarios. - */ - protected long currentTimeMillis() { - return localTickerUtc.read(); - } - - /** - * Infers the health of a node - if it last reported itself as healthy (standby or master), but we haven't heard - * from it in a long time then report that node as failed; otherwise report its health as-is. - */ - private class MarkAwolNodes implements Function<ManagementNodeSyncRecord, ManagementNodeSyncRecord> { - private final ManagementNodeSyncRecord referenceNode; - private MarkAwolNodes(ManagementNodeSyncRecord referenceNode) { - this.referenceNode = referenceNode; - } - @Nullable - @Override - public ManagementNodeSyncRecord apply(@Nullable ManagementNodeSyncRecord input) { - if (input == null) return null; - if (!(input.getStatus() == ManagementNodeState.STANDBY || input.getStatus() == ManagementNodeState.HOT_STANDBY || input.getStatus() == ManagementNodeState.MASTER || input.getStatus() == ManagementNodeState.HOT_BACKUP)) return input; - if (isHeartbeatOk(input, referenceNode)) return input; - return BasicManagementNodeSyncRecord.builder() - .from(input) - .status(ManagementNodeState.FAILED) - .build(); - } - } - - @Override - public String toString() { - return super.toString()+"[node:"+ownNodeId+";running="+running+"]"; - } - - @Override - public Map<String,Object> getMetrics() { - Map<String,Object> result = MutableMap.of(); - - result.put("state", getNodeState()); - result.put("uptime", Time.makeTimeStringRounded(Duration.millis(currentTimeMillis()-startTimeUtc))); - result.put("currentTimeUtc", currentTimeMillis()); - result.put("startTimeUtc", startTimeUtc); - result.put("highAvailability", MutableMap.<String,Object>of( - "priority", getPriority(), - "pollPeriod", getPollPeriod().toMilliseconds(), - "heartbeatTimeout", getHeartbeatTimeout().toMilliseconds(), - "history", nodeStateHistory)); - - result.putAll(managementContext.getRebindManager().getMetrics()); - result.put("managementStatePersistence", - MutableMap.of("read", managementStateReadPersistenceMetrics, "write", managementStateWritePersistenceMetrics)); - - return result; - } - - @Override - public long getLastStateChange() { - if (nodeStateHistory.size() > 0) { - return (Long)nodeStateHistory.get(0).get("timestamp"); - } else { - return 0; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordDeltaImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordDeltaImpl.java b/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordDeltaImpl.java deleted file mode 100644 index 029ffb0..0000000 --- a/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordDeltaImpl.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.util.Collection; - -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister.Delta; - -import com.google.common.annotations.Beta; -import com.google.common.collect.Sets; - -/** - * @since 0.7.0 - * - * @author aled - */ -@Beta -public class ManagementPlaneSyncRecordDeltaImpl implements Delta { - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - private Collection<ManagementNodeSyncRecord> nodes = Sets.newLinkedHashSet(); - private Collection <String> removedNodeIds = Sets.newLinkedHashSet(); - private MasterChange masterChange = MasterChange.NO_CHANGE; - private String master; - private String expectedOldMaster; - - public Builder node(ManagementNodeSyncRecord node) { - nodes.add(checkNotNull(node, "node")); return this; - } - public Builder removedNodeId(String id) { - removedNodeIds.add(checkNotNull(id, "id")); return this; - } - public Builder setMaster(String nodeId) { - masterChange = MasterChange.SET_MASTER; - master = checkNotNull(nodeId, "masterId"); - return this; - } - public Builder clearMaster(String optionalExpectedNodeId) { - masterChange = MasterChange.CLEAR_MASTER; - this.expectedOldMaster = optionalExpectedNodeId; - return this; - } - public Delta build() { - return new ManagementPlaneSyncRecordDeltaImpl(this); - } - } - - private final Collection<ManagementNodeSyncRecord> nodes; - private final Collection <String> removedNodeIds; - private final MasterChange masterChange; - private String masterId; - private String expectedOldMaster; - - ManagementPlaneSyncRecordDeltaImpl(Builder builder) { - nodes = builder.nodes; - removedNodeIds = builder.removedNodeIds; - masterChange = builder.masterChange; - masterId = builder.master; - this.expectedOldMaster = builder.expectedOldMaster; - checkState((masterChange == MasterChange.SET_MASTER) ? (masterId != null) : (masterId == null), - "invalid combination: change=%s; masterId=%s", masterChange, masterId); - } - - @Override - public Collection<ManagementNodeSyncRecord> getNodes() { - return nodes; - } - - @Override - public Collection<String> getRemovedNodeIds() { - return removedNodeIds; - } - - @Override - public MasterChange getMasterChange() { - return masterChange; - } - - @Override - public String getNewMasterOrNull() { - return masterId; - } - - @Override - public String getExpectedMasterToClear() { - return expectedOldMaster; - } - - @Override - public String toString() { - return getClass().getCanonicalName()+"["+ - (masterChange!=null && masterChange != MasterChange.NO_CHANGE ? - masterChange+": "+expectedOldMaster+"->"+masterId+"; " : "")+ - "nodes: "+nodes+ - (removedNodeIds!=null && !removedNodeIds.isEmpty() ? "; removing: "+removedNodeIds : "") - +"]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java b/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java deleted file mode 100644 index dff815e..0000000 --- a/core/src/main/java/brooklyn/management/ha/ManagementPlaneSyncRecordPersisterToObjectStore.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.util.Date; -import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.brooklyn.api.management.ManagementContext; -import org.apache.brooklyn.api.management.ha.HighAvailabilityMode; -import org.apache.brooklyn.api.management.ha.ManagementNodeState; -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.rebind.persister.MementoSerializer; -import brooklyn.entity.rebind.persister.PersistenceObjectStore; -import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccessorWithLock; -import brooklyn.entity.rebind.persister.RetryingMementoSerializer; -import brooklyn.entity.rebind.persister.StoreObjectAccessorLocking; -import brooklyn.entity.rebind.persister.XmlMementoSerializer; -import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord; -import brooklyn.entity.rebind.plane.dto.ManagementPlaneSyncRecordImpl; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Maps; - -/** - * Structure of files is: - * <ul> - * <li>{@code plane/} - top-level directory - * <ul> - * <li>{@code master} - contains the id of the management-node that is currently master - * <li>{@code change.log} - log of changes made - * <li>{@code nodes/} - sub-directory, containing one file per management-node - * <ul> - * <li>{@code a9WiuVKp} - file named after the management-node's id, containing the management node's current state - * <li>{@code E1eDXQF3} - * </ul> - * </ul> - * </ul> - * - * All writes are done synchronously. - * - * @since 0.7.0 - * - * @author aled - */ -@Beta -public class ManagementPlaneSyncRecordPersisterToObjectStore implements ManagementPlaneSyncRecordPersister { - - // TODO Multiple node appending to change.log could cause strange interleaving, or perhaps even data loss? - // But this file is not critical to functionality. - - // TODO Should ManagementPlaneSyncRecordPersister.Delta be different so can tell what is a significant event, - // and thus log it in change.log - currently only subset of significant things being logged. - - private static final Logger LOG = LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterToObjectStore.class); - - private static final Duration SHUTDOWN_TIMEOUT = Duration.TEN_SECONDS; - private static final Duration SYNC_WRITE_TIMEOUT = Duration.TEN_SECONDS; - public static final String NODES_SUB_PATH = "nodes"; - - // TODO Leak if we go through lots of managers; but tiny! - private final ConcurrentMap<String, StoreObjectAccessorWithLock> nodeWriters = Maps.newConcurrentMap(); - - private StoreObjectAccessorWithLock masterWriter; - private StoreObjectAccessorWithLock changeLogWriter; - - private ManagementContext mgmt; - private final PersistenceObjectStore objectStore; - private final MementoSerializer<Object> serializer; - - private static final int MAX_SERIALIZATION_ATTEMPTS = 5; - - private boolean started = false; - private volatile boolean running = true; - - @VisibleForTesting - /** allows, when testing, to be able to override file times / blobstore times with time from the ticker */ - private boolean preferRemoteTimestampInMemento = false; - - /** - * @param mgmt not used much at present but handy to ensure we know it so that obj store is prepared - * @param objectStore the objectStore use to read/write management-plane data; - * this must have been {@link PersistenceObjectStore#prepareForSharedUse(brooklyn.entity.rebind.persister.PersistMode, HighAvailabilityMode)} - * @param classLoader ClassLoader to use when deserializing data - */ - public ManagementPlaneSyncRecordPersisterToObjectStore(ManagementContext mgmt, PersistenceObjectStore objectStore, ClassLoader classLoader) { - this.mgmt = mgmt; - this.objectStore = checkNotNull(objectStore, "objectStore"); - - MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(checkNotNull(classLoader, "classLoader")); - this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, MAX_SERIALIZATION_ATTEMPTS); - - objectStore.createSubPath(NODES_SUB_PATH); - - LOG.debug("ManagementPlaneMemento-persister will use store "+objectStore); - } - - protected synchronized void init() { - if (!started) { - started = true; - //Leading slash causes problems in SL, it's not a correct file name so remove it. - //But once removed we can't load the master file from existing persistence stores. - //Try to detect if the old file exists, if so use old-style names, otherwise use the correct names. - masterWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("/master")); - if (masterWriter.get() != null) { - changeLogWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("/change.log")); - } else { - masterWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("master")); - changeLogWriter = new StoreObjectAccessorLocking(objectStore.newAccessor("change.log")); - } - } - } - - @VisibleForTesting - public void preferRemoteTimestampInMemento() { - preferRemoteTimestampInMemento = true; - } - - @Override - public void stop() { - running = false; - try { - for (StoreObjectAccessorWithLock writer : nodeWriters.values()) { - try { - writer.waitForCurrentWrites(SHUTDOWN_TIMEOUT); - } catch (TimeoutException e) { - LOG.warn("Timeout during shutdown, waiting for write of "+writer+"; continuing"); - } - } - try { - masterWriter.waitForCurrentWrites(SHUTDOWN_TIMEOUT); - } catch (TimeoutException e) { - LOG.warn("Timeout during shutdown, waiting for write of "+masterWriter+"; continuing"); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - - @Override - public ManagementPlaneSyncRecord loadSyncRecord() throws IOException { - if (!running) { - throw new IllegalStateException("Persister not running; cannot load memento from "+ objectStore.getSummaryName()); - } - init(); - - // Note this is called a lot - every time we check the heartbeats - if (LOG.isTraceEnabled()) LOG.trace("Loading management-plane memento from {}", objectStore.getSummaryName()); - - Stopwatch stopwatch = Stopwatch.createStarted(); - - ManagementPlaneSyncRecordImpl.Builder builder = ManagementPlaneSyncRecordImpl.builder(); - - // Be careful about order: if the master-file says nodeX then nodeX's file must have an up-to-date timestamp. - // Therefore read master file first, followed by the other node-files. - String masterNodeId = masterWriter.get(); - if (masterNodeId == null) { - LOG.debug("No master-memento deserialized from file "+masterWriter+"; ignoring and continuing (normal on startup, should cause an error later in live operation)"); - } else { - builder.masterNodeId(masterNodeId); - } - - // Load node-files - List<String> nodeFiles = objectStore.listContentsWithSubPath(NODES_SUB_PATH); - LOG.trace("Loading nodes from {}; {} nodes.", - new Object[]{objectStore.getSummaryName(), nodeFiles.size()}); - - for (String nodeFile : nodeFiles) { - PersistenceObjectStore.StoreObjectAccessor objectAccessor = objectStore.newAccessor(nodeFile); - String nodeContents = null; - Exception problem = null; - try { - nodeContents = objectAccessor.get(); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - problem = e; - } - if (problem!=null || Strings.isBlank(nodeContents)) { - // happens if node has gone away, or if FileBasedObjectStore.moveFile is not atomic, - // i.e. it has deleted but not updated it yet - if (objectAccessor.exists()) { - throw Exceptions.propagate(new IllegalStateException("Node record "+nodeFile+" could not be read when "+mgmt.getManagementNodeId()+" was scanning", problem)); - } else { - LOG.warn("Node record "+nodeFile+" went away while "+mgmt.getManagementNodeId()+" was scanning, ignoring (it has probably been terminated)"); - // if file was deleted, silently ignore - continue; - } - } - ManagementNodeSyncRecord memento = (ManagementNodeSyncRecord) serializer.fromString(nodeContents); - if (memento == null) { - // shouldn't happen - throw Exceptions.propagate(new IllegalStateException("Node record "+nodeFile+" could not be deserialized when "+mgmt.getManagementNodeId()+" was scanning: "+nodeContents, problem)); - } else { - if (memento.getRemoteTimestamp()!=null && preferRemoteTimestampInMemento) { - // in test mode, the remote timestamp is stored in the file - } else { - if (memento.getRemoteTimestamp()!=null) { - LOG.debug("Ignoring remote timestamp in memento file ("+memento+"); looks like this data has been manually copied in"); - } - Date lastModifiedDate = objectAccessor.getLastModifiedDate(); - ((BasicManagementNodeSyncRecord)memento).setRemoteTimestamp(lastModifiedDate!=null ? lastModifiedDate.getTime() : null); - } - builder.node(memento); - } - } - - if (LOG.isDebugEnabled()) LOG.trace("Loaded management-plane memento; {} nodes, took {}", - nodeFiles.size(), - Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS))); - return builder.build(); - } - - @Override - public void delta(Delta delta) { - if (!running) { - if (LOG.isDebugEnabled()) LOG.debug("Persister not running; ignoring checkpointed delta of manager-memento"); - return; - } - init(); - - Stopwatch stopwatch = Stopwatch.createStarted(); - if (LOG.isTraceEnabled()) LOG.trace("Checkpointing delta of manager-memento; updating {}", delta); - - for (ManagementNodeSyncRecord m : delta.getNodes()) { - persist(m); - } - for (String id : delta.getRemovedNodeIds()) { - deleteNode(id); - } - switch (delta.getMasterChange()) { - case NO_CHANGE: - break; // no-op - case SET_MASTER: - persistMaster(checkNotNull(delta.getNewMasterOrNull()), null); - break; - case CLEAR_MASTER: - persistMaster("", delta.getExpectedMasterToClear()); - break; // no-op - default: - throw new IllegalStateException("Unknown state for master-change: "+delta.getMasterChange()); - } - if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of manager-memento in "+Time.makeTimeStringRounded(stopwatch)+": "+delta); - } - - private void persistMaster(String nodeId, String optionalExpectedId) { - if (optionalExpectedId!=null) { - String currentRemoteMaster = masterWriter.get(); - if (currentRemoteMaster==null) { - // okay to have nothing at remote - } else if (!currentRemoteMaster.trim().equals(optionalExpectedId.trim())) { - LOG.warn("Master at server is "+(Strings.isBlank(currentRemoteMaster) ? "<none>" : currentRemoteMaster)+"; expected "+optionalExpectedId+" " - + (Strings.isNonBlank(nodeId) ? "and would set as "+nodeId : "and would clear") - + ", so not applying (yet)"); - return; - } - } - masterWriter.put(nodeId); - try { - masterWriter.waitForCurrentWrites(SYNC_WRITE_TIMEOUT); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - changeLogWriter.append(Time.makeDateString() + ": set master to " + nodeId + "\n"); - try { - changeLogWriter.waitForCurrentWrites(SYNC_WRITE_TIMEOUT); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - @Override - @VisibleForTesting - public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException { - for (StoreObjectAccessorWithLock writer : nodeWriters.values()) { - writer.waitForCurrentWrites(timeout); - } - masterWriter.waitForCurrentWrites(timeout); - } - - public void checkpoint(ManagementPlaneSyncRecord record) { - init(); - for (ManagementNodeSyncRecord node : record.getManagementNodes().values()) { - // Check included in case the node in the memento is the one being initialised by - // BrooklynLauncher in the copy state command. - if (!ManagementNodeState.INITIALIZING.equals(node.getStatus()) && node.getNodeId() != null) { - persist(node); - } - } - } - - private void persist(ManagementNodeSyncRecord node) { - StoreObjectAccessorWithLock writer = getOrCreateNodeWriter(node.getNodeId()); - boolean fileExists = writer.exists(); - writer.put(serializer.toString(node)); - try { - writer.waitForCurrentWrites(SYNC_WRITE_TIMEOUT); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - if (!fileExists) { - changeLogWriter.append(Time.makeDateString()+": created node "+node.getNodeId()+"\n"); - } - if (node.getStatus() == ManagementNodeState.TERMINATED || node.getStatus() == ManagementNodeState.FAILED) { - changeLogWriter.append(Time.makeDateString()+": set node "+node.getNodeId()+" status to "+node.getStatus()+"\n"); - } - } - - private void deleteNode(String nodeId) { - getOrCreateNodeWriter(nodeId).delete(); - changeLogWriter.append(Time.makeDateString()+": deleted node "+nodeId+"\n"); - } - - private StoreObjectAccessorWithLock getOrCreateNodeWriter(String nodeId) { - PersistenceObjectStore.StoreObjectAccessorWithLock writer = nodeWriters.get(nodeId); - if (writer == null) { - nodeWriters.putIfAbsent(nodeId, - new StoreObjectAccessorLocking(objectStore.newAccessor(NODES_SUB_PATH+"/"+nodeId))); - writer = nodeWriters.get(nodeId); - } - return writer; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/ha/MasterChooser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/ha/MasterChooser.java b/core/src/main/java/brooklyn/management/ha/MasterChooser.java deleted file mode 100644 index fdde8b9..0000000 --- a/core/src/main/java/brooklyn/management/ha/MasterChooser.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.ha; - -import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord; - -import brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; - -/** - * For choosing which management node to promote, when master detected as failed or stopped. - * - * @since 0.7.0 - * - * @author aled - */ -@Beta -public interface MasterChooser { - - ManagementNodeSyncRecord choose(ManagementPlaneSyncRecord memento, Duration heartbeatTimeout, String ownNodeId); - -}
