http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynPersistenceUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynPersistenceUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynPersistenceUtils.java deleted file mode 100644 index 55dc2f5..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynPersistenceUtils.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.catalog.CatalogItem; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; -import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState; -import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecord; -import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode; -import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; -import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento; -import org.apache.brooklyn.api.objs.BrooklynObject; -import org.apache.brooklyn.api.objs.BrooklynObjectType; -import org.apache.brooklyn.api.policy.Policy; -import org.apache.brooklyn.api.sensor.Enricher; -import org.apache.brooklyn.api.sensor.Feed; -import org.apache.brooklyn.core.mgmt.ha.ManagementPlaneSyncRecordPersisterToObjectStore; -import org.apache.brooklyn.core.mgmt.internal.LocalLocationManager; -import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; -import org.apache.brooklyn.core.mgmt.rebind.PersistenceExceptionHandlerImpl; -import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer; -import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformerLoader; -import org.apache.brooklyn.core.objs.BrooklynObjectInternal; -import org.apache.brooklyn.core.server.BrooklynServerConfig; -import org.apache.brooklyn.core.server.BrooklynServerPaths; -import org.apache.brooklyn.entity.core.Entities; -import org.apache.brooklyn.entity.core.EntityInternal; -import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; -import org.apache.brooklyn.util.core.ResourceUtils; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.text.Strings; -import org.apache.brooklyn.util.time.Duration; -import org.apache.brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; - -public class BrooklynPersistenceUtils { - - private static final Logger log = LoggerFactory.getLogger(BrooklynPersistenceUtils.class); - - @Beta - public static final List<BrooklynObjectType> STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER = ImmutableList.of( - BrooklynObjectType.ENTITY, BrooklynObjectType.LOCATION, BrooklynObjectType.POLICY, - BrooklynObjectType.ENRICHER, BrooklynObjectType.FEED, BrooklynObjectType.CATALOG_ITEM); - - /** Creates a {@link PersistenceObjectStore} for general-purpose use. */ - public static PersistenceObjectStore newPersistenceObjectStore(ManagementContext managementContext, - String locationSpec, String locationContainer) { - - return newPersistenceObjectStore(managementContext, locationSpec, locationContainer, - PersistMode.AUTO, HighAvailabilityMode.STANDBY); - } - - /** Creates a {@link PersistenceObjectStore} for use with a specified set of modes. */ - public static PersistenceObjectStore newPersistenceObjectStore(ManagementContext managementContext, - String locationSpec, String locationContainer, PersistMode persistMode, HighAvailabilityMode highAvailabilityMode) { - PersistenceObjectStore destinationObjectStore; - locationContainer = BrooklynServerPaths.newMainPersistencePathResolver(managementContext).location(locationSpec).dir(locationContainer).resolve(); - - Location location = null; - if (Strings.isBlank(locationSpec)) { - location = managementContext.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class) - .configure(LocalLocationManager.CREATE_UNMANAGED, true)); - } else { - location = managementContext.getLocationRegistry().resolve(locationSpec, false, null).get(); - if (!(location instanceof LocationWithObjectStore)) { - throw new IllegalArgumentException("Destination location "+location+" does not offer a persistent store"); - } - } - destinationObjectStore = ((LocationWithObjectStore)location).newPersistenceObjectStore(locationContainer); - - destinationObjectStore.injectManagementContext(managementContext); - destinationObjectStore.prepareForSharedUse(persistMode, highAvailabilityMode); - return destinationObjectStore; - } - - public static void writeMemento(ManagementContext managementContext, BrooklynMementoRawData memento, - PersistenceObjectStore destinationObjectStore) { - BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore( - destinationObjectStore, - ((ManagementContextInternal)managementContext).getBrooklynProperties(), - managementContext.getCatalogClassLoader()); - PersistenceExceptionHandler exceptionHandler = PersistenceExceptionHandlerImpl.builder().build(); - persister.enableWriteAccess(); - persister.checkpoint(memento, exceptionHandler); - } - - public static void writeManagerMemento(ManagementContext managementContext, ManagementPlaneSyncRecord optionalPlaneRecord, - PersistenceObjectStore destinationObjectStore) { - if (optionalPlaneRecord != null) { - ManagementPlaneSyncRecordPersisterToObjectStore managementPersister = new ManagementPlaneSyncRecordPersisterToObjectStore( - managementContext, destinationObjectStore, managementContext.getCatalogClassLoader()); - managementPersister.checkpoint(optionalPlaneRecord); - } - } - - public static CompoundTransformer loadTransformer(ResourceUtils resources, String transformationsFileUrl) { - if (Strings.isBlank(transformationsFileUrl)) { - return CompoundTransformer.NOOP; - } else { - String contents = resources.getResourceAsString(transformationsFileUrl); - return CompoundTransformerLoader.load(contents); - } - } - - public static Memento newObjectMemento(BrooklynObject instance) { - return ((BrooklynObjectInternal)instance).getRebindSupport().getMemento(); - } - - public static BrooklynMementoRawData newStateMemento(ManagementContext mgmt, MementoCopyMode source) { - switch (source) { - case LOCAL: - return newStateMementoFromLocal(mgmt); - case REMOTE: - return mgmt.getRebindManager().retrieveMementoRawData(); - case AUTO: - throw new IllegalStateException("Copy mode AUTO not supported here"); - } - throw new IllegalStateException("Should not come here, unknown mode "+source); - } - - public static ManagementPlaneSyncRecord newManagerMemento(ManagementContext mgmt, MementoCopyMode source) { - switch (source) { - case LOCAL: - return mgmt.getHighAvailabilityManager().getLastManagementPlaneSyncRecord(); - case REMOTE: - return mgmt.getHighAvailabilityManager().loadManagementPlaneSyncRecord(true); - case AUTO: - throw new IllegalStateException("Copy mode AUTO not supported here"); - } - throw new IllegalStateException("Should not come here, unknown mode "+source); - } - - - private static BrooklynMementoRawData newStateMementoFromLocal(ManagementContext mgmt) { - BrooklynMementoRawData.Builder result = BrooklynMementoRawData.builder(); - MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(mgmt.getClass().getClassLoader()); - RetryingMementoSerializer<Object> serializer = new RetryingMementoSerializer<Object>(rawSerializer, 1); - - for (Location instance: mgmt.getLocationManager().getLocations()) - result.location(instance.getId(), serializer.toString(newObjectMemento(instance))); - for (Entity instance: mgmt.getEntityManager().getEntities()) { - instance = Entities.deproxy(instance); - result.entity(instance.getId(), serializer.toString(newObjectMemento(instance))); - for (Feed instanceAdjunct: ((EntityInternal)instance).feeds().getFeeds()) - result.feed(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct))); - for (Enricher instanceAdjunct: instance.getEnrichers()) - result.enricher(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct))); - for (Policy instanceAdjunct: instance.getPolicies()) - result.policy(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct))); - } - for (CatalogItem<?,?> instance: mgmt.getCatalog().getCatalogItems()) - result.catalogItem(instance.getId(), serializer.toString(newObjectMemento(instance))); - - return result.build(); - } - - /** generates and writes mementos for the given mgmt context to the given targetStore; - * this may be taken from {@link MementoCopyMode#LOCAL} current state - * or {@link MementoCopyMode#REMOTE} persisted state, or the default {@link MementoCopyMode#AUTO} detected - */ - public static void writeMemento(ManagementContext mgmt, PersistenceObjectStore targetStore, MementoCopyMode source) { - if (source==null || source==MementoCopyMode.AUTO) - source = (mgmt.getHighAvailabilityManager().getNodeState()==ManagementNodeState.MASTER ? MementoCopyMode.LOCAL : MementoCopyMode.REMOTE); - - Stopwatch timer = Stopwatch.createStarted(); - - BrooklynMementoRawData dataRecord = newStateMemento(mgmt, source); - ManagementPlaneSyncRecord mgmtRecord = newManagerMemento(mgmt, source); - - writeMemento(mgmt, dataRecord, targetStore); - writeManagerMemento(mgmt, mgmtRecord, targetStore); - - log.debug("Wrote full memento to "+targetStore+" in "+Time.makeTimeStringRounded(Duration.of(timer))); - } - - public static enum CreateBackupMode { PROMOTION, DEMOTION, CUSTOM; - @Override public String toString() { return super.toString().toLowerCase(); } - } - - public static void createBackup(ManagementContext managementContext, CreateBackupMode mode, MementoCopyMode source) { - if (source==null || source==MementoCopyMode.AUTO) { - switch (mode) { - case PROMOTION: source = MementoCopyMode.REMOTE; break; - case DEMOTION: source = MementoCopyMode.LOCAL; break; - default: - throw new IllegalArgumentException("Cannot detect copy mode for "+mode+"/"+source); - } - } - BrooklynMementoRawData memento = null; - ManagementPlaneSyncRecord planeState = null; - - try { - log.debug("Loading persisted state on "+mode+" for backup purposes"); - memento = newStateMemento(managementContext, source); - try { - planeState = newManagerMemento(managementContext, source); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.warn("Unable to access management plane sync state on "+mode+" (ignoring): "+e, e); - } - - PersistenceObjectStore destinationObjectStore = null; - String backupSpec = managementContext.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_LOCATION_SPEC); - String nonBackupSpec = managementContext.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_LOCATION_SPEC); - try { - String backupContainer = BrooklynServerPaths.newBackupPersistencePathResolver(managementContext) - .location(backupSpec).nonBackupLocation(nonBackupSpec).resolveWithSubpathFor(managementContext, mode.toString()); - destinationObjectStore = BrooklynPersistenceUtils.newPersistenceObjectStore(managementContext, backupSpec, backupContainer); - log.debug("Backing up persisted state on "+mode+", to "+destinationObjectStore.getSummaryName()); - BrooklynPersistenceUtils.writeMemento(managementContext, memento, destinationObjectStore); - BrooklynPersistenceUtils.writeManagerMemento(managementContext, planeState, destinationObjectStore); - if (!memento.isEmpty()) { - log.info("Back-up of persisted state created on "+mode+", in "+destinationObjectStore.getSummaryName()); - } else { - log.debug("Back-up of (empty) persisted state created on "+mode+", in "+destinationObjectStore.getSummaryName()); - } - - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - PersistenceObjectStore failedStore = destinationObjectStore; - if (!Strings.isBlank(backupSpec) && !"localhost".equals(backupSpec)) { - String failedSpec = backupSpec; - backupSpec = "localhost"; - String backupContainer = BrooklynServerPaths.newBackupPersistencePathResolver(managementContext) - .location(backupSpec).nonBackupLocation(nonBackupSpec).resolveWithSubpathFor(managementContext, mode.toString()); - destinationObjectStore = BrooklynPersistenceUtils.newPersistenceObjectStore(managementContext, backupSpec, backupContainer); - log.warn("Persisted state back-up to "+(failedStore!=null ? failedStore.getSummaryName() : failedSpec) - +" failed with "+e, e); - - log.debug("Backing up persisted state on "+mode+", locally because remote failed, to "+destinationObjectStore.getSummaryName()); - BrooklynPersistenceUtils.writeMemento(managementContext, memento, destinationObjectStore); - BrooklynPersistenceUtils.writeManagerMemento(managementContext, planeState, destinationObjectStore); - log.info("Back-up of persisted state created on "+mode+", locally because remote failed, in "+destinationObjectStore.getSummaryName()); - } - } - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.warn("Unable to backup management plane sync state on "+mode+" (ignoring): "+e, e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/CatalogItemLibrariesConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/CatalogItemLibrariesConverter.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/CatalogItemLibrariesConverter.java deleted file mode 100644 index 1e53b41..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/CatalogItemLibrariesConverter.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import java.util.ArrayList; -import java.util.Collection; - -import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle; -import org.apache.brooklyn.api.catalog.CatalogItem.CatalogItemLibraries; -import org.apache.brooklyn.core.catalog.internal.CatalogBundleDto; - -import com.thoughtworks.xstream.converters.Converter; -import com.thoughtworks.xstream.converters.MarshallingContext; -import com.thoughtworks.xstream.converters.UnmarshallingContext; -import com.thoughtworks.xstream.io.HierarchicalStreamReader; -import com.thoughtworks.xstream.io.HierarchicalStreamWriter; - -/** - * Convert old-style rebind file formats to the latest version. - * The code is needed only during transition to the new version, can be removed after a while. - */ -@Deprecated -public class CatalogItemLibrariesConverter implements Converter { - - @Override - public boolean canConvert(@SuppressWarnings("rawtypes") Class type) { - return CatalogItemLibraries.class.isAssignableFrom(type) || - Collection.class.isAssignableFrom(type); - } - - @Override - public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) { - context.convertAnother(source); - } - - @Override - public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) { - Object obj = context.convertAnother(context.currentObject(), context.getRequiredType()); - if (CatalogItemLibraries.class.isAssignableFrom(context.getRequiredType())) { - CatalogItemLibraries libs = (CatalogItemLibraries)obj; - Collection<String> bundles = libs.getBundles(); - Collection<CatalogBundle> libraries = new ArrayList<CatalogBundle>(bundles.size()); - for (String url : bundles) { - libraries.add(new CatalogBundleDto(null, null, url)); - } - return libraries; - } else { - return obj; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedObjectStore.java deleted file mode 100644 index bd83387..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedObjectStore.java +++ /dev/null @@ -1,425 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; -import org.apache.brooklyn.core.server.BrooklynServerConfig; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.internal.ssh.process.ProcessTool; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.exceptions.FatalConfigurationRuntimeException; -import org.apache.brooklyn.util.io.FileUtil; -import org.apache.brooklyn.util.os.Os; -import org.apache.brooklyn.util.os.Os.DeletionResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.io.Files; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -/** - * @author Andrea Turli - */ -public class FileBasedObjectStore implements PersistenceObjectStore { - - private static final Logger log = LoggerFactory.getLogger(FileBasedObjectStore.class); - - private static final int SHUTDOWN_TIMEOUT_MS = 10*1000; - - private final File basedir; - private final ListeningExecutorService executor; - private ManagementContext mgmt; - private boolean prepared = false; - private boolean deferredBackupNeeded = false; - private AtomicBoolean doneFirstContentiousWrite = new AtomicBoolean(false); - - /** - * @param basedir - */ - public FileBasedObjectStore(File basedir) { - this.basedir = checkPersistenceDirPlausible(basedir); - this.executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); - log.debug("File-based objectStore will use directory {}", basedir); - // don't check accessible yet, we do that when we prepare - } - - @Override - public String getSummaryName() { - return getBaseDir().getAbsolutePath(); - } - - public File getBaseDir() { - return basedir; - } - - public void prepareForMasterUse() { - if (doneFirstContentiousWrite.get()) - return; - synchronized (this) { - if (doneFirstContentiousWrite.get()) - return; - try { - if (deferredBackupNeeded) { - // defer backup and path creation until first write - // this way if node is standby or auto, the backup is not created superfluously - - File backup = backupDirByCopying(basedir); - log.info("Persistence deferred backup, directory "+basedir+" backed up to "+backup.getAbsolutePath()); - - deferredBackupNeeded = false; - } - } catch (Exception e) { - throw Exceptions.propagate(e); - } - doneFirstContentiousWrite.getAndSet(true); - } - } - - @Override - public void createSubPath(String subPath) { - if (!prepared) throw new IllegalStateException("Not yet prepared: "+this); - - File dir = new File(getBaseDir(), subPath); - if (dir.mkdir()) { - try { - FileUtil.setFilePermissionsTo700(dir); - } catch (IOException e) { - log.warn("Unable to set sub-directory permissions to 700 (continuing): "+dir); - } - } else { - if (!dir.exists()) - throw new IllegalStateException("Cannot create "+dir+"; call returned false"); - } - checkPersistenceDirAccessible(dir); - } - - @Override - public StoreObjectAccessor newAccessor(String path) { - if (!prepared) throw new IllegalStateException("Not yet prepared: "+this); - - String tmpExt = ".tmp"; - if (mgmt!=null && mgmt.getManagementNodeId()!=null) tmpExt = "."+mgmt.getManagementNodeId()+tmpExt; - return new FileBasedStoreObjectAccessor(new File(Os.mergePaths(getBaseDir().getAbsolutePath(), path)), tmpExt); - } - - @Override - public List<String> listContentsWithSubPath(final String parentSubPath) { - if (!prepared) throw new IllegalStateException("Not yet prepared: "+this); - - Preconditions.checkNotNull(parentSubPath); - File subPathDir = new File(basedir, parentSubPath); - - FileFilter fileFilter = new FileFilter() { - @Override public boolean accept(File file) { - // An inclusion filter would be safer than exclusion - return !file.getName().endsWith(".tmp") && !file.getName().endsWith(".swp"); - } - }; - File[] subPathDirFiles = subPathDir.listFiles(fileFilter); - if (subPathDirFiles==null) return ImmutableList.<String>of(); - return FluentIterable.from(Arrays.asList(subPathDirFiles)) - .transform(new Function<File, String>() { - @Nullable - @Override - public String apply(@Nullable File input) { - return format("%s/%s", parentSubPath, input.getName()); - } - }).toList(); - } - - @Override - public void close() { - executor.shutdown(); - try { - executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("basedir", basedir).toString(); - } - - @Override - public void injectManagementContext(ManagementContext mgmt) { - if (this.mgmt!=null && !this.mgmt.equals(mgmt)) - throw new IllegalStateException("Cannot change mgmt context of "+this); - this.mgmt = mgmt; - } - - @Override - public void prepareForSharedUse(@Nullable PersistMode persistMode, HighAvailabilityMode haMode) { - if (mgmt==null) throw new NullPointerException("Must inject ManagementContext before preparing "+this); - - if (persistMode==null || persistMode==PersistMode.DISABLED) { - // TODO is this check needed? shouldn't come here now without persistence on. - prepared = true; - return; - } - - @SuppressWarnings("deprecation") - Boolean backups = mgmt.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_REQUIRED); - if (Boolean.TRUE.equals(backups)) { - log.warn("Using legacy backup for "+this+"; functionality will be removed in future versions, in favor of promotion/demotion-specific backups to a configurable backup location."); - } - // default backups behaviour here changed to false, Nov 2014, because these backups are now legacy; - // we prefer the made when persistence is enabled, using routines in BrooklynPersistenceUtils - if (backups==null) backups = false; - - File dir = getBaseDir(); - try { - String persistencePath = dir.getAbsolutePath(); - - switch (persistMode) { - case CLEAN: - if (dir.exists()) { - checkPersistenceDirAccessible(dir); - try { - if (backups) { - File old = backupDirByMoving(dir); - log.info("Persistence mode CLEAN, directory "+persistencePath+" backed up to "+old.getAbsolutePath()); - } else { - deleteCompletely(); - log.info("Persistence mode CLEAN, directory "+persistencePath+" deleted"); - } - } catch (IOException e) { - throw new FatalConfigurationRuntimeException("Error using existing persistence directory "+dir.getAbsolutePath(), e); - } - } else { - log.debug("Persistence mode CLEAN, directory "+persistencePath+", no previous state"); - } - break; - case REBIND: - checkPersistenceDirAccessible(dir); - checkPersistenceDirNonEmpty(dir); - try { - if (backups) { - if (haMode==HighAvailabilityMode.MASTER) { - File backup = backupDirByCopying(dir); - log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath()); - } else { - deferredBackupNeeded = true; - } - } - } catch (IOException e) { - throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e); - } - break; - case AUTO: - if (dir.exists()) { - checkPersistenceDirAccessible(dir); - } - if (dir.exists() && !isMementoDirExistButEmpty(dir)) { - try { - if (backups) { - if (haMode==HighAvailabilityMode.MASTER) { - File backup = backupDirByCopying(dir); - log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath()); - } else { - deferredBackupNeeded = true; - } - } - } catch (IOException e) { - throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e); - } - } else { - log.debug("Persistence mode AUTO, directory "+persistencePath+", no previous state"); - } - break; - default: - throw new FatalConfigurationRuntimeException("Unexpected persist mode "+persistMode+"; modified during initialization?!"); - }; - - if (!dir.exists()) { - boolean success = dir.mkdirs(); - if (success) { - FileUtil.setFilePermissionsTo700(dir); - } else { - throw new FatalConfigurationRuntimeException("Failed to create persistence directory "+dir); - } - } - - } catch (Exception e) { - throw Exceptions.propagate(e); - } - - prepared = true; - } - - protected File checkPersistenceDirPlausible(File dir) { - checkNotNull(dir, "directory"); - if (!dir.exists()) return dir; - if (dir.isFile()) throw new FatalConfigurationRuntimeException("Invalid persistence directory" + dir + ": must not be a file"); - if (!(dir.canRead() && dir.canWrite())) throw new FatalConfigurationRuntimeException("Invalid persistence directory" + dir + ": " + - (!dir.canRead() ? "not readable" : - (!dir.canWrite() ? "not writable" : "unknown reason"))); - return dir; - } - - protected void checkPersistenceDirAccessible(File dir) { - if (!(dir.exists() && dir.isDirectory() && dir.canRead() && dir.canWrite())) { - FatalConfigurationRuntimeException problem = new FatalConfigurationRuntimeException("Invalid persistence directory " + dir + ": " + - (!dir.exists() ? "does not exist" : - (!dir.isDirectory() ? "not a directory" : - (!dir.canRead() ? "not readable" : - (!dir.canWrite() ? "not writable" : "unknown reason"))))); - log.debug("Invalid persistence directory "+dir+" (rethrowing): "+problem, problem); - } else { - log.debug("Created dir {} for {}", dir, this); - } - } - - protected void checkPersistenceDirNonEmpty(File persistenceDir) { - FatalConfigurationRuntimeException problem; - if (!persistenceDir.exists()) { - problem = new FatalConfigurationRuntimeException("Invalid persistence directory "+persistenceDir+" because directory does not exist"); - log.debug("Invalid persistence directory "+persistenceDir+" (rethrowing): "+problem, problem); - throw problem; - } if (isMementoDirExistButEmpty(persistenceDir)) { - problem = new FatalConfigurationRuntimeException("Invalid persistence directory "+persistenceDir+" because directory is empty"); - log.debug("Invalid persistence directory "+persistenceDir+" (rethrowing): "+problem, problem); - throw problem; - } - } - - protected File backupDirByCopying(File dir) throws IOException, InterruptedException { - File parentDir = dir.getParentFile(); - String simpleName = dir.getName(); - String timestamp = new SimpleDateFormat("yyyyMMdd-hhmmssSSS").format(new Date()); - File backupDir = new File(parentDir, simpleName+"."+timestamp+".bak"); - - FileUtil.copyDir(dir, backupDir); - FileUtil.setFilePermissionsTo700(backupDir); - - return backupDir; - } - - protected File backupDirByMoving(File dir) throws InterruptedException, IOException { - File parentDir = dir.getParentFile(); - String simpleName = dir.getName(); - String timestamp = new SimpleDateFormat("yyyyMMdd-hhmmssSSS").format(new Date()); - File newDir = new File(parentDir, simpleName+"."+timestamp+".bak"); - - FileUtil.moveDir(dir, newDir); - return newDir; - } - - private static boolean WARNED_ON_NON_ATOMIC_FILE_UPDATES = false; - /** - * Attempts an fs level atomic move then fall back to pure java rename. - * Assumes files are on same mount point. - * <p> - * TODO Java 7 gives an atomic Files.move() which would be preferred. - */ - static void moveFile(File srcFile, File destFile) throws IOException, InterruptedException { - // Try rename first - it is a *much* cheaper call than invoking a system call in Java. - // However, rename is not guaranteed cross platform to succeed if the destination exists, - // and not guaranteed to be atomic, but it usually seems to do the right thing... - boolean result; - result = srcFile.renameTo(destFile); - if (result) { - if (log.isTraceEnabled()) log.trace("java rename of {} to {} completed", srcFile, destFile); - return; - } - - if (!Os.isMicrosoftWindows()) { - // this command, if it succeeds, is guaranteed to be atomic, and it will usually overwrite - String cmd = "mv '"+srcFile.getAbsolutePath()+"' '"+destFile.getAbsolutePath()+"'"; - - int exitStatus = new ProcessTool().execCommands(MutableMap.<String,String>of(), MutableList.of(cmd), null); - // prefer the above to the below because it wraps it in the appropriate bash -// Process proc = Runtime.getRuntime().exec(cmd); -// result = proc.waitFor(); - - if (log.isTraceEnabled()) log.trace("FS move of {} to {} completed, code {}", new Object[] { srcFile, destFile, exitStatus }); - if (exitStatus == 0) return; - } - - // finally try a delete - but explicitly warn this is not going to be atomic - // so if another node reads it might see no master - if (!WARNED_ON_NON_ATOMIC_FILE_UPDATES) { - WARNED_ON_NON_ATOMIC_FILE_UPDATES = true; - log.warn("Unable to perform atomic file update ("+srcFile+" to "+destFile+"); file system not recommended for production HA/DR"); - } - destFile.delete(); - result = srcFile.renameTo(destFile); - if (log.isTraceEnabled()) log.trace("java delete and rename of {} to {} completed, code {}", new Object[] { srcFile, destFile, result }); - if (result) - return; - Files.copy(srcFile, destFile); - srcFile.delete(); - throw new IOException("Could not move "+destFile+" to "+srcFile); - } - - /** - * True if directory exists, but is entirely empty, or only contains empty directories. - */ - static boolean isMementoDirExistButEmpty(String dir) { - return isMementoDirExistButEmpty(new File(dir)); - } - - static boolean isMementoDirExistButEmpty(File dir) { - if (!dir.exists()) return false; - File[] contents = dir.listFiles(); - if (contents == null) return false; - - for (File sub : contents) { - if (sub.isFile()) return false; - if (sub.isDirectory() && sub.listFiles().length > 0) return false; - } - return true; - } - - @Override - public void deleteCompletely() { - deleteCompletely(getBaseDir()); - } - - public static void deleteCompletely(File d) { - DeletionResult result = Os.deleteRecursively(d); - if (!result.wasSuccessful()) - log.warn("Unable to delete persistence dir "+d); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedStoreObjectAccessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedStoreObjectAccessor.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedStoreObjectAccessor.java deleted file mode 100644 index 55af3bc..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/FileBasedStoreObjectAccessor.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import java.io.File; -import java.io.IOException; -import java.util.Date; - -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.io.FileUtil; -import org.apache.brooklyn.util.text.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Charsets; -import com.google.common.base.Objects; -import com.google.common.base.Throwables; -import com.google.common.io.Files; - -/** - * Reads/writes to a file. This impl does it immediately, with no synchronisation. - * Callers should wrap in {@link StoreObjectAccessorLocking} if multiple threads may be accessing this. - * - * @author aled - */ -public class FileBasedStoreObjectAccessor implements PersistenceObjectStore.StoreObjectAccessor { - private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreObjectAccessor.class); - - public FileBasedStoreObjectAccessor(File file, String tmpExtension) { - this.file = file; - this.tmpFile = new File(file.getParentFile(), file.getName()+(Strings.isBlank(tmpExtension) ? ".tmp" : tmpExtension)); - } - - private final File file; - private final File tmpFile; - - @Override - public String get() { - try { - if (!exists()) return null; - return Files.asCharSource(file, Charsets.UTF_8).read(); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - - @Override - public byte[] getBytes() { - try { - if (!exists()) return null; - return Files.asByteSource(file).read(); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean exists() { - return file.exists(); - } - - // Setting permissions to 600 reduces objectAccessor.put performance from about 5000 per second to 3000 per second - // in java 6. With Java 7's Files.setPosixFilePermissions, this might well improve. - @Override - public void put(String val) { - try { - if (val==null) val = ""; - FileUtil.setFilePermissionsTo600(tmpFile); - Files.write(val, tmpFile, Charsets.UTF_8); - FileBasedObjectStore.moveFile(tmpFile, file); - } catch (IOException e) { - throw Exceptions.propagate(e); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - - @Override - public void append(String val) { - try { - if (val==null) val = ""; - FileUtil.setFilePermissionsTo600(file); - Files.append(val, file, Charsets.UTF_8); - - } catch (IOException e) { - throw Exceptions.propagate(e); - } - } - - @Override - public void delete() { - if (!file.delete()) { - if (!file.exists()) { - LOG.debug("Unable to delete " + file.getAbsolutePath() + ". Probably did not exist."); - } else { - LOG.warn("Unable to delete " + file.getAbsolutePath() + ". Probably still locked."); - } - } - if (tmpFile.exists() && !tmpFile.delete()) { - // tmpFile is probably already deleted, so don't even log debug if it does not exist - LOG.warn("Unable to delete " + tmpFile.getAbsolutePath() + ". Probably still locked."); - } - } - - @Override - public Date getLastModifiedDate() { - long result = file.lastModified(); - if (result==0) return null; - return new Date(result); - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("file", file).toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/LocationWithObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/LocationWithObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/LocationWithObjectStore.java deleted file mode 100644 index a3fcab2..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/LocationWithObjectStore.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -/** Marker interface for locations which can create a {@link PersistenceObjectStore} */ -public interface LocationWithObjectStore { - - /** Creates a {@link PersistenceObjectStore} pointed at the given container/directory. */ - public PersistenceObjectStore newPersistenceObjectStore(String container); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/MementoSerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/MementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/MementoSerializer.java deleted file mode 100644 index 54680c6..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/MementoSerializer.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import org.apache.brooklyn.api.mgmt.ha.ManagementNodeSyncRecord; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext; - -/** Serializes the given object; it is often used with {@link BrooklynMemento} for persisting and restoring, - * though it can be used for any object (and is also used for the {@link ManagementNodeSyncRecord} instances) */ -public interface MementoSerializer<T> { - - public static final MementoSerializer<String> NOOP = new MementoSerializer<String>() { - @Override - public String toString(String memento) { - return memento; - } - @Override - public String fromString(String string) { - return string; - } - @Override - public void setLookupContext(LookupContext lookupContext) { - // no-op - } - @Override - public void unsetLookupContext() { - // no-op - } - }; - - String toString(T memento); - T fromString(String string); - void setLookupContext(LookupContext lookupContext); - void unsetLookupContext(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistMode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistMode.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistMode.java deleted file mode 100644 index 0056bd2..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistMode.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -public enum PersistMode { - DISABLED, - AUTO, - REBIND, - CLEAN; -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceActivityMetrics.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceActivityMetrics.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceActivityMetrics.java deleted file mode 100644 index 8c36a79..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceActivityMetrics.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import java.util.List; -import java.util.Map; - -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.time.Duration; - -public class PersistenceActivityMetrics { - - final static int MAX_ERRORS = 200; - - long count=0, failureCount=0; - Long lastSuccessTime, lastDuration, lastFailureTime; - List<Map<String,Object>> errorMessages = MutableList.of(); - - public void noteSuccess(Duration duration) { - count++; - lastSuccessTime = System.currentTimeMillis(); - lastDuration = duration.toMilliseconds(); - } - - public void noteFailure(Duration duration) { - count++; - failureCount++; - lastFailureTime = System.currentTimeMillis(); - lastDuration = duration!=null ? duration.toMilliseconds() : -1; - } - - public void noteError(String error) { - noteErrorObject(error); - } - - public void noteError(List<?> error) { - noteErrorObject(error); - } - - /** error should be json-serializable; exceptions can be problematic */ - protected synchronized void noteErrorObject(Object error) { - errorMessages.add(0, MutableMap.<String,Object>of("error", error, "timestamp", System.currentTimeMillis())); - while (errorMessages.size() > MAX_ERRORS) { - errorMessages.remove(errorMessages.size()-1); - } - } - - public synchronized Map<String,Object> asMap() { - Map<String,Object> result = MutableMap.of(); - result.put("count", count); - result.put("lastSuccessTimeUtc", lastSuccessTime); - result.put("lastSuccessTimeMillisSince", since(lastSuccessTime)); - result.put("lastDuration", lastDuration); - result.put("failureCount", failureCount); - result.put("lastFailureTimeUtc", lastFailureTime); - result.put("lastFailureTimeMillisSince", since(lastFailureTime)); - result.put("errorMessages", MutableList.copyOf(errorMessages)); - return result; - } - - private Long since(Long time) { - if (time==null) return null; - return System.currentTimeMillis() - time; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceObjectStore.java deleted file mode 100644 index 16bf543..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/PersistenceObjectStore.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReadWriteLock; - -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; -import org.apache.brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; - -/** - * Interface for working with persistence targets, including file system and jclouds object stores. - * @author Andrea Turli - */ -public interface PersistenceObjectStore { - - /** accessor to an object/item in a {@link PersistenceObjectStore} */ - public interface StoreObjectAccessor { - /** gets the object, or null if not found */ - String get(); - byte[] getBytes(); - boolean exists(); - void put(String contentsToReplaceOrCreate); - void append(String contentsToAppendOrCreate); - void delete(); - // NB: creation date is available for many blobstores but - // not on java.io.File and filesystems, so it is not included here - /** last modified date, null if not supported or does not exist */ - Date getLastModifiedDate(); - } - public interface StoreObjectAccessorWithLock extends StoreObjectAccessor { - /** waits for all currently scheduled write lock operations (puts, appends, and deletes) to complete; - * but does not wait on or prevent subsequent modifications. - * this is suitable for a model where the caller is managing synchronization. - * <p> - * for more complex uses, readers should <code>getLockObject().readLock().lockInterruptibly()</code> - * and ensure they subsequently <code>unlock()</code> it of course. see {@link #getLockObject()}. */ - void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException; - - /** returns the underlying lock in case callers need more complex synchronization control */ - ReadWriteLock getLockObject(); - } - - /** human-readable name of this object store */ - public String getSummaryName(); - - /** - * Allows a way for an object store to be created ahead of time, and a mgmt context injected. - * Currently subsequent changes are not permitted. - * <p> - * A {@link ManagementContext} must be supplied via constructor or this method before invoking other methods. - */ - @Beta - public void injectManagementContext(ManagementContext managementContext); - - /** - * Prepares the persistence store for read use and non-contentious write use, - * in particular detecting whether we should clean or register a need for backup etc. - * Typically called early in the setup lifecycle, after {@link #injectManagementContext(ManagementContext)}, - * but before {@link #prepareForMasterUse()}. - * <p> - * See {@link #prepareForMasterUse()} for discussion of "contentious writes". - */ - @Beta - public void prepareForSharedUse(PersistMode persistMode, HighAvailabilityMode highAvailabilityMode); - - /** - * Prepares the persistence store for "contentious writes". - * These are defined as those writes which might overwrite important information. - * Implementations usually perform backup/versioning of the store if required. - * <p> - * Caller must call {@link #prepareForSharedUse(PersistMode, HighAvailabilityMode)} first - * (and {@link #injectManagementContext(ManagementContext)} before that). - * <p> - * This is typically invoked "at the last moment" e.g. before the any such write, - * mainly in order to prevent backups being made unnecessarily (e.g. if a node is standby, - * or if it tries to become master but is not capable), - * but also to prevent simultaneous backups which can cause problems with some stores - * (only a mgmt who knows he is the master should invoke this). - **/ - @Beta - public void prepareForMasterUse(); - - /** - * For reading/writing data to the item at the given path. - * Note that the accessor is not generally thread safe, usually does not support blocking, - * and multiple instances may conflict with each other. - * <p> - * Clients should wrap in a dedicated {@link StoreObjectAccessorLocking} and share - * if multiple threads may be accessing the store. - * This method may be changed in future to allow access to a shared locking accessor. - */ - @Beta - // TODO requiring clients to wrap and cache accessors is not very nice API, - // better would be to do caching here probably, - // but we've already been doing it this way above for now (Jun 2014) - StoreObjectAccessor newAccessor(String path); - - /** create the directory at the given subPath relative to the base of this store */ - void createSubPath(String subPath); - - /** - * Lists the paths of objects contained at the given path, including the subpath. - * For example, if a file-based ObjectStore is configured to write to file://path/to/root/ - * then parentSubPath=entities would return the contents of /path/to/root/entities/, such as - * [entities/e1, entities/e2, entities/e3]. - * The returned paths values are usable in calls to {@link #newAccessor(String)}. - */ - List<String> listContentsWithSubPath(String subPath); - - /** Entirely delete the contents of this persistence location. - * Use with care, primarily in tests. This will recursively wipe the indicated location. */ - public void deleteCompletely(); - - /** - * Closes all resources used by this ObjectStore. No subsequent calls should be made to the ObjectStore; - * behaviour of such calls is undefined but likely to throw exceptions. - */ - void close(); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/RetryingMementoSerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/RetryingMementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/RetryingMementoSerializer.java deleted file mode 100644 index 86fc359..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/RetryingMementoSerializer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RetryingMementoSerializer<T> implements MementoSerializer<T> { - - private static final Logger LOG = LoggerFactory.getLogger(RetryingMementoSerializer.class); - - private final MementoSerializer<T> delegate; - private final int maxAttempts; - - public RetryingMementoSerializer(MementoSerializer<T> delegate, int maxAttempts) { - this.delegate = checkNotNull(delegate, "delegate"); - this.maxAttempts = maxAttempts; - if (maxAttempts < 1) throw new IllegalArgumentException("Max attempts must be at least 1, but was "+maxAttempts); - } - - @Override - public String toString(T memento) { - RuntimeException lastException = null; - int attempt = 0; - do { - attempt++; - try { - String result = delegate.toString(memento); - if (attempt>1) - LOG.info("Success following previous serialization error"); - return result; - } catch (RuntimeException e) { - LOG.warn("Error serializing memento (attempt "+attempt+" of "+maxAttempts+") for "+memento+ - "; expected sometimes if attribute value modified", e); - lastException = e; - } - } while (attempt < maxAttempts); - - throw lastException; - } - - @Override - public T fromString(String string) { - if (string==null) - return null; - - RuntimeException lastException = null; - int attempt = 0; - do { - attempt++; - try { - T result = delegate.fromString(string); - if (attempt>1) - LOG.info("Success following previous deserialization error, got: "+result); - return result; - } catch (RuntimeException e) { - // trying multiple times only makes sense for a few errors (namely ConcModExceptions); perhaps deprecate that strategy? - LOG.warn("Error deserializing memento (attempt "+attempt+" of "+maxAttempts+"): "+e, e); - if (attempt==1) LOG.debug("Memento which was not deserialized is:\n"+string); - lastException = e; - } - } while (attempt < maxAttempts); - - throw lastException; - } - - @Override - public void setLookupContext(LookupContext lookupContext) { - delegate.setLookupContext(lookupContext); - } - - @Override - public void unsetLookupContext() { - delegate.unsetLookupContext(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/StoreObjectAccessorLocking.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/StoreObjectAccessorLocking.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/StoreObjectAccessorLocking.java deleted file mode 100644 index a8517e3..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/StoreObjectAccessorLocking.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import java.util.Comparator; -import java.util.Date; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceObjectStore.StoreObjectAccessor; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.javalang.JavaClassNames; -import org.apache.brooklyn.util.time.Duration; - -/** Wraps access to an object (the delegate {@link StoreObjectAccessor} - * in a guarded read-write context such that callers will be blocked if another thread - * is accessing the object in an incompatible way (e.g. trying to read when someone is writing). - * See {@link ReadWriteLock}. - * <p> - * This has no visibility or control over other access to the delegate or underlying object, of course. - * It can only affect callers coming through this wrapper instance. Thus callers must share instances - * of this class for a given item. - * <p> - * No locking is done with respect to {@link #getLastModifiedDate()}. - **/ -public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreObjectAccessorWithLock { - - protected static class ThreadComparator implements Comparator<Thread> { - @Override - public int compare(Thread o1, Thread o2) { - if (o1.getId()<o2.getId()) return -1; - if (o1.getId()>o2.getId()) return 1; - return 0; - } - } - - ReadWriteLock lock = new ReentrantReadWriteLock(true); - Set<Thread> queuedReaders = new ConcurrentSkipListSet<Thread>(new ThreadComparator()); - Set<Thread> queuedWriters = new ConcurrentSkipListSet<Thread>(new ThreadComparator()); - - final PersistenceObjectStore.StoreObjectAccessor delegate; - - public StoreObjectAccessorLocking(PersistenceObjectStore.StoreObjectAccessor delegate) { - this.delegate = delegate; - } - - @Override - public String get() { - try { - queuedReaders.add(Thread.currentThread()); - lock.readLock().lockInterruptibly(); - try { - return delegate.get(); - - } finally { - lock.readLock().unlock(); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } finally { - queuedReaders.remove(Thread.currentThread()); - } - } - - @Override - public byte[] getBytes() { - try { - queuedReaders.add(Thread.currentThread()); - lock.readLock().lockInterruptibly(); - try { - return delegate.getBytes(); - - } finally { - lock.readLock().unlock(); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } finally { - queuedReaders.remove(Thread.currentThread()); - } - } - - @Override - public boolean exists() { - try { - queuedReaders.add(Thread.currentThread()); - lock.readLock().lockInterruptibly(); - try { - return delegate.exists(); - - } finally { - lock.readLock().unlock(); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } finally { - queuedReaders.remove(Thread.currentThread()); - } - } - - protected boolean hasScheduledPutOrDeleteWithNoRead() { - // skip write if there is another write queued and no reader waiting - return (!queuedWriters.isEmpty() && queuedReaders.isEmpty()); - } - - @Override - public void put(String val) { - try { - queuedWriters.add(Thread.currentThread()); - lock.writeLock().lockInterruptibly(); - try { - queuedWriters.remove(Thread.currentThread()); - if (hasScheduledPutOrDeleteWithNoRead()) - // don't bother writing if someone will write after us and no one is reading - return; - delegate.put(val); - - } finally { - lock.writeLock().unlock(); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } finally { - queuedWriters.remove(Thread.currentThread()); - } - } - - @Override - public void append(String val) { - try { - lock.writeLock().lockInterruptibly(); - try { - if (hasScheduledPutOrDeleteWithNoRead()) - // don't bother appending if someone will write after us and no one is reading - return; - - delegate.append(val); - - } finally { - lock.writeLock().unlock(); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - - @Override - public void delete() { - try { - queuedWriters.add(Thread.currentThread()); - lock.writeLock().lockInterruptibly(); - try { - queuedWriters.remove(Thread.currentThread()); - if (hasScheduledPutOrDeleteWithNoRead()) { - // don't bother deleting if someone will write after us and no one is reading - return; - } - delegate.delete(); - - } finally { - lock.writeLock().unlock(); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } finally { - queuedWriters.remove(Thread.currentThread()); - } - } - - @Override - public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException { - try { - boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS); - if (locked) { - lock.readLock().unlock(); - } else { - throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout); - } - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } - - @Override - public Date getLastModifiedDate() { - return delegate.getLastModifiedDate(); - } - - @Override - public ReadWriteLock getLockObject() { - return lock; - } - - @Override - public String toString() { - return JavaClassNames.simpleClassName(this)+":"+delegate.toString(); - } -}
