http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java new file mode 100644 index 0000000..aa83e14 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; + +/** + * Interface for working with persistence targets, including file system and jclouds object stores. + * @author Andrea Turli + */ +public interface PersistenceObjectStore { + + /** accessor to an object/item in a {@link PersistenceObjectStore} */ + public interface StoreObjectAccessor { + /** gets the object, or null if not found */ + String get(); + byte[] getBytes(); + boolean exists(); + void put(String contentsToReplaceOrCreate); + void append(String contentsToAppendOrCreate); + void delete(); + // NB: creation date is available for many blobstores but + // not on java.io.File and filesystems, so it is not included here + /** last modified date, null if not supported or does not exist */ + Date getLastModifiedDate(); + } + public interface StoreObjectAccessorWithLock extends StoreObjectAccessor { + /** waits for all currently scheduled write lock operations (puts, appends, and deletes) to complete; + * but does not wait on or prevent subsequent modifications. + * this is suitable for a model where the caller is managing synchronization. + * <p> + * for more complex uses, readers should <code>getLockObject().readLock().lockInterruptibly()</code> + * and ensure they subsequently <code>unlock()</code> it of course. see {@link #getLockObject()}. */ + void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException; + + /** returns the underlying lock in case callers need more complex synchronization control */ + ReadWriteLock getLockObject(); + } + + /** human-readable name of this object store */ + public String getSummaryName(); + + /** + * Allows a way for an object store to be created ahead of time, and a mgmt context injected. + * Currently subsequent changes are not permitted. + * <p> + * A {@link ManagementContext} must be supplied via constructor or this method before invoking other methods. + */ + @Beta + public void injectManagementContext(ManagementContext managementContext); + + /** + * Prepares the persistence store for read use and non-contentious write use, + * in particular detecting whether we should clean or register a need for backup etc. + * Typically called early in the setup lifecycle, after {@link #injectManagementContext(ManagementContext)}, + * but before {@link #prepareForMasterUse()}. + * <p> + * See {@link #prepareForMasterUse()} for discussion of "contentious writes". + */ + @Beta + public void prepareForSharedUse(PersistMode persistMode, HighAvailabilityMode highAvailabilityMode); + + /** + * Prepares the persistence store for "contentious writes". + * These are defined as those writes which might overwrite important information. + * Implementations usually perform backup/versioning of the store if required. + * <p> + * Caller must call {@link #prepareForSharedUse(PersistMode, HighAvailabilityMode)} first + * (and {@link #injectManagementContext(ManagementContext)} before that). + * <p> + * This is typically invoked "at the last moment" e.g. before the any such write, + * mainly in order to prevent backups being made unnecessarily (e.g. if a node is standby, + * or if it tries to become master but is not capable), + * but also to prevent simultaneous backups which can cause problems with some stores + * (only a mgmt who knows he is the master should invoke this). + **/ + @Beta + public void prepareForMasterUse(); + + /** + * For reading/writing data to the item at the given path. + * Note that the accessor is not generally thread safe, usually does not support blocking, + * and multiple instances may conflict with each other. + * <p> + * Clients should wrap in a dedicated {@link StoreObjectAccessorLocking} and share + * if multiple threads may be accessing the store. + * This method may be changed in future to allow access to a shared locking accessor. + */ + @Beta + // TODO requiring clients to wrap and cache accessors is not very nice API, + // better would be to do caching here probably, + // but we've already been doing it this way above for now (Jun 2014) + StoreObjectAccessor newAccessor(String path); + + /** create the directory at the given subPath relative to the base of this store */ + void createSubPath(String subPath); + + /** + * Lists the paths of objects contained at the given path, including the subpath. + * For example, if a file-based ObjectStore is configured to write to file://path/to/root/ + * then parentSubPath=entities would return the contents of /path/to/root/entities/, such as + * [entities/e1, entities/e2, entities/e3]. + * The returned paths values are usable in calls to {@link #newAccessor(String)}. + */ + List<String> listContentsWithSubPath(String subPath); + + /** Entirely delete the contents of this persistence location. + * Use with care, primarily in tests. This will recursively wipe the indicated location. */ + public void deleteCompletely(); + + /** + * Closes all resources used by this ObjectStore. No subsequent calls should be made to the ObjectStore; + * behaviour of such calls is undefined but likely to throw exceptions. + */ + void close(); + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java new file mode 100644 index 0000000..480a2ec --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryingMementoSerializer<T> implements MementoSerializer<T> { + + private static final Logger LOG = LoggerFactory.getLogger(RetryingMementoSerializer.class); + + private final MementoSerializer<T> delegate; + private final int maxAttempts; + + public RetryingMementoSerializer(MementoSerializer<T> delegate, int maxAttempts) { + this.delegate = checkNotNull(delegate, "delegate"); + this.maxAttempts = maxAttempts; + if (maxAttempts < 1) throw new IllegalArgumentException("Max attempts must be at least 1, but was "+maxAttempts); + } + + @Override + public String toString(T memento) { + RuntimeException lastException = null; + int attempt = 0; + do { + attempt++; + try { + String result = delegate.toString(memento); + if (attempt>1) + LOG.info("Success following previous serialization error"); + return result; + } catch (RuntimeException e) { + LOG.warn("Error serializing memento (attempt "+attempt+" of "+maxAttempts+") for "+memento+ + "; expected sometimes if attribute value modified", e); + lastException = e; + } + } while (attempt < maxAttempts); + + throw lastException; + } + + @Override + public T fromString(String string) { + if (string==null) + return null; + + RuntimeException lastException = null; + int attempt = 0; + do { + attempt++; + try { + T result = delegate.fromString(string); + if (attempt>1) + LOG.info("Success following previous deserialization error, got: "+result); + return result; + } catch (RuntimeException e) { + // trying multiple times only makes sense for a few errors (namely ConcModExceptions); perhaps deprecate that strategy? + LOG.warn("Error deserializing memento (attempt "+attempt+" of "+maxAttempts+"): "+e, e); + if (attempt==1) LOG.debug("Memento which was not deserialized is:\n"+string); + lastException = e; + } + } while (attempt < maxAttempts); + + throw lastException; + } + + @Override + public void setLookupContext(LookupContext lookupContext) { + delegate.setLookupContext(lookupContext); + } + + @Override + public void unsetLookupContext() { + delegate.unsetLookupContext(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java new file mode 100644 index 0000000..302121f --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import java.util.Comparator; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore.StoreObjectAccessor; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.javalang.JavaClassNames; +import org.apache.brooklyn.util.time.Duration; + +/** Wraps access to an object (the delegate {@link StoreObjectAccessor} + * in a guarded read-write context such that callers will be blocked if another thread + * is accessing the object in an incompatible way (e.g. trying to read when someone is writing). + * See {@link ReadWriteLock}. + * <p> + * This has no visibility or control over other access to the delegate or underlying object, of course. + * It can only affect callers coming through this wrapper instance. Thus callers must share instances + * of this class for a given item. + * <p> + * No locking is done with respect to {@link #getLastModifiedDate()}. + **/ +public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreObjectAccessorWithLock { + + protected static class ThreadComparator implements Comparator<Thread> { + @Override + public int compare(Thread o1, Thread o2) { + if (o1.getId()<o2.getId()) return -1; + if (o1.getId()>o2.getId()) return 1; + return 0; + } + } + + ReadWriteLock lock = new ReentrantReadWriteLock(true); + Set<Thread> queuedReaders = new ConcurrentSkipListSet<Thread>(new ThreadComparator()); + Set<Thread> queuedWriters = new ConcurrentSkipListSet<Thread>(new ThreadComparator()); + + final PersistenceObjectStore.StoreObjectAccessor delegate; + + public StoreObjectAccessorLocking(PersistenceObjectStore.StoreObjectAccessor delegate) { + this.delegate = delegate; + } + + @Override + public String get() { + try { + queuedReaders.add(Thread.currentThread()); + lock.readLock().lockInterruptibly(); + try { + return delegate.get(); + + } finally { + lock.readLock().unlock(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + queuedReaders.remove(Thread.currentThread()); + } + } + + @Override + public byte[] getBytes() { + try { + queuedReaders.add(Thread.currentThread()); + lock.readLock().lockInterruptibly(); + try { + return delegate.getBytes(); + + } finally { + lock.readLock().unlock(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + queuedReaders.remove(Thread.currentThread()); + } + } + + @Override + public boolean exists() { + try { + queuedReaders.add(Thread.currentThread()); + lock.readLock().lockInterruptibly(); + try { + return delegate.exists(); + + } finally { + lock.readLock().unlock(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + queuedReaders.remove(Thread.currentThread()); + } + } + + protected boolean hasScheduledPutOrDeleteWithNoRead() { + // skip write if there is another write queued and no reader waiting + return (!queuedWriters.isEmpty() && queuedReaders.isEmpty()); + } + + @Override + public void put(String val) { + try { + queuedWriters.add(Thread.currentThread()); + lock.writeLock().lockInterruptibly(); + try { + queuedWriters.remove(Thread.currentThread()); + if (hasScheduledPutOrDeleteWithNoRead()) + // don't bother writing if someone will write after us and no one is reading + return; + delegate.put(val); + + } finally { + lock.writeLock().unlock(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + queuedWriters.remove(Thread.currentThread()); + } + } + + @Override + public void append(String val) { + try { + lock.writeLock().lockInterruptibly(); + try { + if (hasScheduledPutOrDeleteWithNoRead()) + // don't bother appending if someone will write after us and no one is reading + return; + + delegate.append(val); + + } finally { + lock.writeLock().unlock(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public void delete() { + try { + queuedWriters.add(Thread.currentThread()); + lock.writeLock().lockInterruptibly(); + try { + queuedWriters.remove(Thread.currentThread()); + if (hasScheduledPutOrDeleteWithNoRead()) { + // don't bother deleting if someone will write after us and no one is reading + return; + } + delegate.delete(); + + } finally { + lock.writeLock().unlock(); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + queuedWriters.remove(Thread.currentThread()); + } + } + + @Override + public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException { + try { + boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS); + if (locked) { + lock.readLock().unlock(); + } else { + throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout); + } + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public Date getLastModifiedDate() { + return delegate.getLastModifiedDate(); + } + + @Override + public ReadWriteLock getLockObject() { + return lock; + } + + @Override + public String toString() { + return JavaClassNames.simpleClassName(this)+":"+delegate.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java new file mode 100644 index 0000000..799ad91 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.mgmt.persist; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.io.Writer; +import java.util.NoSuchElementException; +import java.util.Stack; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.catalog.CatalogItem; +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext; +import org.apache.brooklyn.api.objs.Identifiable; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.api.sensor.Enricher; +import org.apache.brooklyn.api.sensor.Feed; +import org.apache.brooklyn.core.catalog.internal.CatalogBundleDto; +import org.apache.brooklyn.core.catalog.internal.CatalogUtils; +import org.apache.brooklyn.core.config.BasicConfigKey; +import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential; +import org.apache.brooklyn.core.mgmt.classloading.ClassLoaderFromBrooklynClassLoadingContext; +import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext; +import org.apache.brooklyn.core.mgmt.rebind.dto.BasicCatalogItemMemento; +import org.apache.brooklyn.core.mgmt.rebind.dto.BasicEnricherMemento; +import org.apache.brooklyn.core.mgmt.rebind.dto.BasicEntityMemento; +import org.apache.brooklyn.core.mgmt.rebind.dto.BasicFeedMemento; +import org.apache.brooklyn.core.mgmt.rebind.dto.BasicLocationMemento; +import org.apache.brooklyn.core.mgmt.rebind.dto.BasicPolicyMemento; +import org.apache.brooklyn.core.mgmt.rebind.dto.MutableBrooklynMemento; +import org.apache.brooklyn.effector.core.BasicParameterType; +import org.apache.brooklyn.effector.core.EffectorAndBody; +import org.apache.brooklyn.effector.core.EffectorTasks.EffectorBodyTaskFactory; +import org.apache.brooklyn.effector.core.EffectorTasks.EffectorTaskFactory; +import org.apache.brooklyn.sensor.core.BasicAttributeSensor; +import org.apache.brooklyn.util.core.xstream.XmlSerializer; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; + +import com.thoughtworks.xstream.converters.Converter; +import com.thoughtworks.xstream.converters.MarshallingContext; +import com.thoughtworks.xstream.converters.SingleValueConverter; +import com.thoughtworks.xstream.converters.UnmarshallingContext; +import com.thoughtworks.xstream.converters.reflection.ReflectionConverter; +import com.thoughtworks.xstream.core.ReferencingMarshallingContext; +import com.thoughtworks.xstream.core.util.HierarchicalStreams; +import com.thoughtworks.xstream.io.HierarchicalStreamReader; +import com.thoughtworks.xstream.io.HierarchicalStreamWriter; +import com.thoughtworks.xstream.io.path.PathTrackingReader; +import com.thoughtworks.xstream.mapper.Mapper; +import com.thoughtworks.xstream.mapper.MapperWrapper; + +/* uses xml, cleaned up a bit + * + * there is an early attempt at doing this with JSON in pull request #344 but + * it is not nicely deserializable, see comments at http://xstream.codehaus.org/json-tutorial.html */ +public class XmlMementoSerializer<T> extends XmlSerializer<T> implements MementoSerializer<T> { + + private static final Logger LOG = LoggerFactory.getLogger(XmlMementoSerializer.class); + + private final ClassLoader classLoader; + private LookupContext lookupContext; + + public XmlMementoSerializer(ClassLoader classLoader) { + this.classLoader = checkNotNull(classLoader, "classLoader"); + xstream.setClassLoader(this.classLoader); + + // old (deprecated in 070? or earlier) single-file persistence uses this keyword; TODO remove soon in 080 ? + xstream.alias("brooklyn", MutableBrooklynMemento.class); + + xstream.alias("entity", BasicEntityMemento.class); + xstream.alias("location", BasicLocationMemento.class); + xstream.alias("policy", BasicPolicyMemento.class); + xstream.alias("feed", BasicFeedMemento.class); + xstream.alias("enricher", BasicEnricherMemento.class); + xstream.alias("configKey", BasicConfigKey.class); + xstream.alias("catalogItem", BasicCatalogItemMemento.class); + xstream.alias("bundle", CatalogBundleDto.class); + xstream.alias("attributeSensor", BasicAttributeSensor.class); + + xstream.alias("effector", Effector.class); + xstream.addDefaultImplementation(EffectorAndBody.class, Effector.class); + xstream.alias("parameter", BasicParameterType.class); + xstream.addDefaultImplementation(EffectorBodyTaskFactory.class, EffectorTaskFactory.class); + + xstream.alias("entityRef", Entity.class); + xstream.alias("locationRef", Location.class); + xstream.alias("policyRef", Policy.class); + xstream.alias("enricherRef", Enricher.class); + + xstream.registerConverter(new LocationConverter()); + xstream.registerConverter(new PolicyConverter()); + xstream.registerConverter(new EnricherConverter()); + xstream.registerConverter(new EntityConverter()); + xstream.registerConverter(new FeedConverter()); + xstream.registerConverter(new CatalogItemConverter()); + xstream.registerConverter(new SpecConverter()); + + xstream.registerConverter(new ManagementContextConverter()); + xstream.registerConverter(new TaskConverter(xstream.getMapper())); + + //For compatibility with existing persistence stores content. + xstream.aliasField("registeredTypeName", BasicCatalogItemMemento.class, "symbolicName"); + xstream.registerLocalConverter(BasicCatalogItemMemento.class, "libraries", new CatalogItemLibrariesConverter()); + } + + // Warning: this is called in the super-class constuctor, so before this constructor! + @Override + protected MapperWrapper wrapMapper(MapperWrapper next) { + MapperWrapper mapper = super.wrapMapper(next); + mapper = new CustomMapper(mapper, Entity.class, "entityProxy"); + mapper = new CustomMapper(mapper, Location.class, "locationProxy"); + return mapper; + } + + @Override + public void serialize(Object object, Writer writer) { + super.serialize(object, writer); + try { + writer.append("\n"); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public void setLookupContext(LookupContext lookupContext) { + this.lookupContext = checkNotNull(lookupContext, "lookupContext"); + } + + @Override + public void unsetLookupContext() { + this.lookupContext = null; + } + + /** + * For changing the tag used for anything that implements/extends the given type. + * Necessary for using EntityRef rather than the default "dynamic-proxy" tag. + * + * @author aled + */ + public class CustomMapper extends MapperWrapper { + private final Class<?> clazz; + private final String alias; + + public CustomMapper(Mapper wrapped, Class<?> clazz, String alias) { + super(wrapped); + this.clazz = checkNotNull(clazz, "clazz"); + this.alias = checkNotNull(alias, "alias"); + } + + public String getAlias() { + return alias; + } + + @Override + public String serializedClass(@SuppressWarnings("rawtypes") Class type) { + if (type != null && clazz.isAssignableFrom(type)) { + return alias; + } else { + return super.serializedClass(type); + } + } + + @Override + public Class<?> realClass(String elementName) { + if (elementName.equals(alias)) { + return clazz; + } else { + return super.realClass(elementName); + } + } + } + + public abstract class IdentifiableConverter<IT extends Identifiable> implements SingleValueConverter { + private final Class<IT> clazz; + + IdentifiableConverter(Class<IT> clazz) { + this.clazz = clazz; + } + @Override + public boolean canConvert(@SuppressWarnings("rawtypes") Class type) { + boolean result = clazz.isAssignableFrom(type); + return result; + } + + @Override + public String toString(Object obj) { + return obj == null ? null : ((Identifiable)obj).getId(); + } + @Override + public Object fromString(String str) { + if (lookupContext == null) { + LOG.warn("Cannot unmarshal from persisted xml {} {}; no lookup context supplied!", clazz.getSimpleName(), str); + return null; + } else { + return lookup(str); + } + } + + protected abstract IT lookup(String id); + } + + public class LocationConverter extends IdentifiableConverter<Location> { + LocationConverter() { + super(Location.class); + } + @Override + protected Location lookup(String id) { + return lookupContext.lookupLocation(id); + } + } + + public class PolicyConverter extends IdentifiableConverter<Policy> { + PolicyConverter() { + super(Policy.class); + } + @Override + protected Policy lookup(String id) { + return lookupContext.lookupPolicy(id); + } + } + + public class EnricherConverter extends IdentifiableConverter<Enricher> { + EnricherConverter() { + super(Enricher.class); + } + @Override + protected Enricher lookup(String id) { + return lookupContext.lookupEnricher(id); + } + } + + public class FeedConverter extends IdentifiableConverter<Feed> { + FeedConverter() { + super(Feed.class); + } + @Override + protected Feed lookup(String id) { + return lookupContext.lookupFeed(id); + } + } + + public class EntityConverter extends IdentifiableConverter<Entity> { + EntityConverter() { + super(Entity.class); + } + @Override + protected Entity lookup(String id) { + return lookupContext.lookupEntity(id); + } + } + + @SuppressWarnings("rawtypes") + public class CatalogItemConverter extends IdentifiableConverter<CatalogItem> { + CatalogItemConverter() { + super(CatalogItem.class); + } + @Override + protected CatalogItem<?,?> lookup(String id) { + return lookupContext.lookupCatalogItem(id); + } + } + + + static boolean loggedTaskWarning = false; + public class TaskConverter implements Converter { + private final Mapper mapper; + + TaskConverter(Mapper mapper) { + this.mapper = mapper; + } + @Override + public boolean canConvert(@SuppressWarnings("rawtypes") Class type) { + return Task.class.isAssignableFrom(type); + } + @SuppressWarnings("deprecation") + @Override + public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) { + if (source == null) return; + if (((Task<?>)source).isDone() && !((Task<?>)source).isError()) { + try { + context.convertAnother(((Task<?>)source).get()); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } catch (ExecutionException e) { + LOG.warn("Unexpected exception getting done (and non-error) task result for "+source+"; continuing: "+e, e); + } + } else { + // TODO How to log sensibly, without it logging this every second?! + // jun 2014, have added a "log once" which is not ideal but better than the log never behaviour + if (!loggedTaskWarning) { + LOG.warn("Intercepting and skipping request to serialize a Task" + + (context instanceof ReferencingMarshallingContext ? " at "+((ReferencingMarshallingContext)context).currentPath() : "")+ + " (only logging this once): "+source); + loggedTaskWarning = true; + } + + return; + } + } + @Override + public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) { + if (reader.hasMoreChildren()) { + Class<?> type = HierarchicalStreams.readClassType(reader, mapper); + reader.moveDown(); + Object result = context.convertAnother(null, type); + reader.moveUp(); + return result; + } else { + return null; + } + } + } + + public class ManagementContextConverter implements Converter { + @Override + public boolean canConvert(@SuppressWarnings("rawtypes") Class type) { + return ManagementContext.class.isAssignableFrom(type); + } + @Override + public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) { + // write nothing, and always insert the current mgmt context + } + @Override + public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) { + return lookupContext.lookupManagementContext(); + } + } + + /** When reading/writing specs, it checks whether there is a catalog item id set and uses it to load */ + public class SpecConverter extends ReflectionConverter { + SpecConverter() { + super(xstream.getMapper(), xstream.getReflectionProvider()); + } + @Override + public boolean canConvert(@SuppressWarnings("rawtypes") Class type) { + return AbstractBrooklynObjectSpec.class.isAssignableFrom(type); + } + @Override + public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) { + if (source == null) return; + AbstractBrooklynObjectSpec<?, ?> spec = (AbstractBrooklynObjectSpec<?, ?>) source; + String catalogItemId = spec.getCatalogItemId(); + if (Strings.isNonBlank(catalogItemId)) { + // write this field first, so we can peek at it when we read + writer.startNode("catalogItemId"); + writer.setValue(catalogItemId); + writer.endNode(); + + // we're going to write the catalogItemId field twice :( but that's okay. + // better solution would be to have mark/reset on reader so we can peek for such a field; + // see comment below + super.marshal(source, writer, context); + } else { + super.marshal(source, writer, context); + } + } + @Override + public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) { + String catalogItemId = null; + instantiateNewInstanceSettingCache(reader, context); + + if (reader instanceof PathTrackingReader) { + // have to assume this is first; there is no mark/reset support on these readers + // (if there were then it would be easier, we could just look for that child anywhere, + // and not need a custom writer!) + if ("catalogItemId".equals( ((PathTrackingReader)reader).peekNextChild() )) { + // cache the instance + + reader.moveDown(); + catalogItemId = reader.getValue(); + reader.moveUp(); + } + } + boolean customLoaderSet = false; + try { + if (Strings.isNonBlank(catalogItemId)) { + if (lookupContext==null) throw new NullPointerException("lookupContext required to load catalog item "+catalogItemId); + CatalogItem<?, ?> cat = CatalogUtils.getCatalogItemOptionalVersion(lookupContext.lookupManagementContext(), catalogItemId); + if (cat==null) throw new NoSuchElementException("catalog item: "+catalogItemId); + BrooklynClassLoadingContext clcNew = CatalogUtils.newClassLoadingContext(lookupContext.lookupManagementContext(), cat); + pushXstreamCustomClassLoader(clcNew); + customLoaderSet = true; + } + + AbstractBrooklynObjectSpec<?, ?> result = (AbstractBrooklynObjectSpec<?, ?>) super.unmarshal(reader, context); + // we wrote it twice so this shouldn't be necessary; but if we fix it so we only write once, we'd need this + result.catalogItemId(catalogItemId); + return result; + } finally { + instance = null; + if (customLoaderSet) { + popXstreamCustomClassLoader(); + } + } + } + + Object instance; + + @Override + protected Object instantiateNewInstance(HierarchicalStreamReader reader, UnmarshallingContext context) { + // the super calls getAttribute which requires that we have not yet done moveDown, + // so we do this earlier and cache it for when we call super.unmarshal + if (instance==null) + throw new IllegalStateException("Instance should be created and cached"); + return instance; + } + protected void instantiateNewInstanceSettingCache(HierarchicalStreamReader reader, UnmarshallingContext context) { + instance = super.instantiateNewInstance(reader, context); + } + } + + Stack<BrooklynClassLoadingContext> contexts = new Stack<BrooklynClassLoadingContext>(); + Stack<ClassLoader> cls = new Stack<ClassLoader>(); + AtomicReference<Thread> xstreamLockOwner = new AtomicReference<Thread>(); + int lockCount; + + /** Must be accompanied by a corresponding {@link #popXstreamCustomClassLoader()} when finished. */ + @SuppressWarnings("deprecation") + protected void pushXstreamCustomClassLoader(BrooklynClassLoadingContext clcNew) { + acquireXstreamLock(); + BrooklynClassLoadingContext oldClc; + if (!contexts.isEmpty()) { + oldClc = contexts.peek(); + } else { + // TODO XmlMementoSerializer should take a BCLC instead of a CL + oldClc = JavaBrooklynClassLoadingContext.create(lookupContext.lookupManagementContext(), xstream.getClassLoader()); + } + BrooklynClassLoadingContextSequential clcMerged = new BrooklynClassLoadingContextSequential(lookupContext.lookupManagementContext(), + oldClc, clcNew); + contexts.push(clcMerged); + cls.push(xstream.getClassLoader()); + ClassLoader newCL = ClassLoaderFromBrooklynClassLoadingContext.of(clcMerged); + xstream.setClassLoader(newCL); + } + + protected void popXstreamCustomClassLoader() { + synchronized (xstreamLockOwner) { + releaseXstreamLock(); + xstream.setClassLoader(cls.pop()); + contexts.pop(); + } + } + + protected void acquireXstreamLock() { + synchronized (xstreamLockOwner) { + while (true) { + if (xstreamLockOwner.compareAndSet(null, Thread.currentThread()) || + Thread.currentThread().equals( xstreamLockOwner.get() )) { + break; + } + try { + xstreamLockOwner.wait(1000); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + lockCount++; + } + } + + protected void releaseXstreamLock() { + synchronized (xstreamLockOwner) { + if (lockCount<=0) { + throw new IllegalStateException("xstream not locked"); + } + if (--lockCount == 0) { + if (!xstreamLockOwner.compareAndSet(Thread.currentThread(), null)) { + Thread oldOwner = xstreamLockOwner.getAndSet(null); + throw new IllegalStateException("xstream was locked by "+oldOwner+" but unlock attempt by "+Thread.currentThread()); + } + xstreamLockOwner.notifyAll(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java index 6b586a8..d6c3160 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java @@ -36,8 +36,8 @@ import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.api.objs.BrooklynObjectType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynMementoPersisterToObjectStore; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics; +import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore; +import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; import org.apache.brooklyn.entity.core.EntityInternal; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java index 74a0497..c3b657d 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java @@ -30,7 +30,7 @@ import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.brooklyn.location.basic.AbstractLocation; +import org.apache.brooklyn.location.core.AbstractLocation; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.flags.FlagUtils; import org.apache.brooklyn.util.core.flags.TypeCoercions; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java index c69dca4..81785bc 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java @@ -37,7 +37,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.PolicyMemento; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.sensor.Enricher; -import org.apache.brooklyn.location.basic.LocationInternal; +import org.apache.brooklyn.location.core.internal.LocationInternal; import com.google.common.collect.Maps; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java index 8a36a0d..8b4cc29 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java @@ -31,7 +31,7 @@ import org.apache.brooklyn.core.mgmt.internal.BrooklynObjectManagementMode; import org.apache.brooklyn.core.mgmt.internal.EntityManagerInternal; import org.apache.brooklyn.core.mgmt.internal.LocationManagerInternal; import org.apache.brooklyn.core.mgmt.internal.ManagementTransitionMode; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics; +import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.entity.core.EntityInternal; import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java index 7d68438..663061d 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java @@ -44,8 +44,8 @@ import org.apache.brooklyn.api.sensor.Enricher; import org.apache.brooklyn.api.sensor.Feed; import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics; +import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; +import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; import org.apache.brooklyn.entity.core.EntityInternal; import org.apache.brooklyn.util.collections.MutableMap; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index fc50d30..83cc155 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -71,8 +71,8 @@ import org.apache.brooklyn.core.mgmt.internal.EntityManagerInternal; import org.apache.brooklyn.core.mgmt.internal.LocationManagerInternal; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; import org.apache.brooklyn.core.mgmt.internal.ManagementTransitionMode; +import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl.RebindTracker; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics; import org.apache.brooklyn.core.objs.AbstractBrooklynObject; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory; @@ -82,8 +82,8 @@ import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory; import org.apache.brooklyn.entity.core.AbstractApplication; import org.apache.brooklyn.entity.core.AbstractEntity; import org.apache.brooklyn.entity.core.EntityInternal; -import org.apache.brooklyn.location.basic.AbstractLocation; -import org.apache.brooklyn.location.basic.LocationInternal; +import org.apache.brooklyn.location.core.AbstractLocation; +import org.apache.brooklyn.location.core.internal.LocationInternal; import org.apache.brooklyn.policy.core.AbstractPolicy; import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.sensor.feed.AbstractFeed; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java index 3372fef..4c82dbe 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java @@ -48,10 +48,10 @@ import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement; import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl; import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal; -import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynMementoPersisterToObjectStore; -import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics; -import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils.CreateBackupMode; +import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore; +import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; +import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics; +import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils.CreateBackupMode; import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer; import org.apache.brooklyn.core.server.BrooklynServerConfig; import org.apache.brooklyn.entity.core.Entities; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java index c62b064..505c844 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java @@ -49,13 +49,13 @@ import org.apache.brooklyn.api.sensor.Feed; import org.apache.brooklyn.api.sensor.AttributeSensor.SensorPersistenceMode; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.catalog.internal.CatalogItemDo; +import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils; import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport; import org.apache.brooklyn.core.mgmt.rebind.TreeUtils; -import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils; import org.apache.brooklyn.core.objs.BrooklynTypes; import org.apache.brooklyn.entity.core.EntityDynamicType; import org.apache.brooklyn.entity.core.EntityInternal; -import org.apache.brooklyn.location.basic.LocationInternal; +import org.apache.brooklyn.location.core.internal.LocationInternal; import org.apache.brooklyn.policy.core.AbstractPolicy; import org.apache.brooklyn.sensor.enricher.AbstractEnricher; import org.apache.brooklyn.sensor.feed.AbstractFeed; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java deleted file mode 100644 index eb22d2e..0000000 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java +++ /dev/null @@ -1,697 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.rebind.persister; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.api.catalog.CatalogItem; -import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler; -import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister; -import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; -import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento; -import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento; -import org.apache.brooklyn.api.objs.BrooklynObject; -import org.apache.brooklyn.api.objs.BrooklynObjectType; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.config.StringConfigMap; -import org.apache.brooklyn.core.catalog.internal.CatalogUtils; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.mgmt.classloading.ClassLoaderFromBrooklynClassLoadingContext; -import org.apache.brooklyn.core.mgmt.rebind.PeriodicDeltaChangeListener; -import org.apache.brooklyn.core.mgmt.rebind.PersisterDeltaImpl; -import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoImpl; -import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoManifestImpl; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceObjectStore.StoreObjectAccessor; -import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceObjectStore.StoreObjectAccessorWithLock; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.xstream.XmlUtil; -import org.apache.brooklyn.util.exceptions.CompoundRuntimeException; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.text.Strings; -import org.apache.brooklyn.util.time.Duration; -import org.apache.brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -/** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable - * {@link PersistenceObjectStore} such as a file system or a jclouds object store */ -public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister { - - // TODO Crazy amount of duplication between handling entity, location, policy, enricher + feed; - // Need to remove that duplication. - - // TODO Should stop() take a timeout, and shutdown the executor gracefully? - - private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class); - - public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey( - "persister.threadpool.maxSize", - "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)", - 10); - - public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey( - "persister.maxSerializationAttempts", - "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)", - 5); - - private final PersistenceObjectStore objectStore; - private final MementoSerializer<Object> serializerWithStandardClassLoader; - - private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>(); - - private final ListeningExecutorService executor; - - private volatile boolean writesAllowed = false; - private volatile boolean writesShuttingDown = false; - private StringConfigMap brooklynProperties; - - private List<Delta> queuedDeltas = new CopyOnWriteArrayList<BrooklynMementoPersister.Delta>(); - - /** - * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block - * for any concurrent call to complete. - */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - - public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, StringConfigMap brooklynProperties, ClassLoader classLoader) { - this.objectStore = checkNotNull(objectStore, "objectStore"); - this.brooklynProperties = brooklynProperties; - - int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS); - MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader); - this.serializerWithStandardClassLoader = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts); - - int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE); - - objectStore.createSubPath("entities"); - objectStore.createSubPath("locations"); - objectStore.createSubPath("policies"); - objectStore.createSubPath("enrichers"); - objectStore.createSubPath("feeds"); - objectStore.createSubPath("catalog"); - - // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ? - objectStore.createSubPath("plane"); - - executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() { - @Override public Thread newThread(Runnable r) { - // Note: Thread name referenced in logback-includes' ThreadNameDiscriminator - return new Thread(r, "brooklyn-persister"); - }})); - } - - public MementoSerializer<Object> getMementoSerializer() { - return getSerializerWithStandardClassLoader(); - } - - protected MementoSerializer<Object> getSerializerWithStandardClassLoader() { - return serializerWithStandardClassLoader; - } - - protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, BrooklynObjectType type, String objectId) { - ClassLoader cl = getCustomClassLoaderForBrooklynObject(lookupContext, type, objectId); - if (cl==null) return serializerWithStandardClassLoader; - return getSerializerWithCustomClassLoader(lookupContext, cl); - } - - protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, ClassLoader classLoader) { - int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS); - MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader); - MementoSerializer<Object> result = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts); - result.setLookupContext(lookupContext); - return result; - } - - @Nullable protected ClassLoader getCustomClassLoaderForBrooklynObject(LookupContext lookupContext, BrooklynObjectType type, String objectId) { - BrooklynObject item = lookupContext.peek(type, objectId); - String catalogItemId = (item == null) ? null : item.getCatalogItemId(); - // TODO enrichers etc aren't yet known -- would need to backtrack to the entity to get them from bundles - if (catalogItemId == null) { - return null; - } - // See RebindIteration.BrooklynObjectInstantiator.load(), for handling where catalog item is missing; - // similar logic here. - CatalogItem<?, ?> catalogItem = CatalogUtils.getCatalogItemOptionalVersion(lookupContext.lookupManagementContext(), catalogItemId); - if (catalogItem == null) { - // TODO do we need to only log once, rather than risk log.warn too often? I think this only happens on rebind, so ok. - LOG.warn("Unable to load catalog item "+catalogItemId+" for custom class loader of "+type+" "+objectId+"; will use default class loader"); - return null; - } else { - return ClassLoaderFromBrooklynClassLoadingContext.of(CatalogUtils.newClassLoadingContext(lookupContext.lookupManagementContext(), catalogItem)); - } - } - - @Override public void enableWriteAccess() { - writesAllowed = true; - } - - @Override - public void disableWriteAccess(boolean graceful) { - writesShuttingDown = true; - try { - writesAllowed = false; - // a very long timeout to ensure we don't lose state. - // If persisting thousands of entities over slow network to Object Store, could take minutes. - waitForWritesCompleted(Duration.ONE_HOUR); - - } catch (Exception e) { - throw Exceptions.propagate(e); - } finally { - writesShuttingDown = false; - } - } - - @Override - public void stop(boolean graceful) { - disableWriteAccess(graceful); - - if (executor != null) { - if (graceful) { - executor.shutdown(); - try { - // should be quick because we've just turned off writes, waiting for their completion - executor.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - } else { - executor.shutdownNow(); - } - } - } - - public PersistenceObjectStore getObjectStore() { - return objectStore; - } - - protected StoreObjectAccessorWithLock getWriter(String path) { - String id = path.substring(path.lastIndexOf('/')+1); - synchronized (writers) { - StoreObjectAccessorWithLock writer = writers.get(id); - if (writer == null) { - writer = new StoreObjectAccessorLocking( objectStore.newAccessor(path) ); - writers.put(id, writer); - } - return writer; - } - } - - private Map<String,String> makeIdSubPathMap(Iterable<String> subPathLists) { - Map<String,String> result = MutableMap.of(); - for (String subpath: subPathLists) { - String id = subpath; - id = id.substring(id.lastIndexOf('/')+1); - id = id.substring(id.lastIndexOf('\\')+1); - // assumes id is the filename; should work even if not, as id is later read from xpath - // but you'll get warnings (and possibility of loss if there is a collision) - result.put(id, subpath); - } - return result; - } - - protected BrooklynMementoRawData listMementoSubPathsAsData(final RebindExceptionHandler exceptionHandler) { - final BrooklynMementoRawData.Builder subPathDataBuilder = BrooklynMementoRawData.builder(); - - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) - subPathDataBuilder.putAll(type, makeIdSubPathMap(objectStore.listContentsWithSubPath(type.getSubPathName()))); - - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - exceptionHandler.onLoadMementoFailed(BrooklynObjectType.UNKNOWN, "Failed to list files", e); - throw new IllegalStateException("Failed to list memento files in "+objectStore, e); - } - - BrooklynMementoRawData subPathData = subPathDataBuilder.build(); - LOG.debug("Loaded rebind lists; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{ - Time.makeTimeStringRounded(stopwatch), - subPathData.getEntities().size(), subPathData.getLocations().size(), subPathData.getPolicies().size(), subPathData.getEnrichers().size(), - subPathData.getFeeds().size(), subPathData.getCatalogItems().size(), - objectStore.getSummaryName() }); - - return subPathData; - } - - public BrooklynMementoRawData loadMementoRawData(final RebindExceptionHandler exceptionHandler) { - BrooklynMementoRawData subPathData = listMementoSubPathsAsData(exceptionHandler); - - final BrooklynMementoRawData.Builder builder = BrooklynMementoRawData.builder(); - - Visitor loaderVisitor = new Visitor() { - @Override - public void visit(BrooklynObjectType type, String id, String contentsSubpath) throws Exception { - String contents = null; - try { - contents = read(contentsSubpath); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - exceptionHandler.onLoadMementoFailed(type, "memento "+id+" read error", e); - } - - String xmlId = (String) XmlUtil.xpath(contents, "/"+type.toCamelCase()+"/id"); - String safeXmlId = Strings.makeValidFilename(xmlId); - if (!Objects.equal(id, safeXmlId)) - LOG.warn("ID mismatch on "+type.toCamelCase()+", "+id+" from path, "+safeXmlId+" from xml"); - - builder.put(type, xmlId, contents); - } - }; - - Stopwatch stopwatch = Stopwatch.createStarted(); - - visitMemento("loading raw", subPathData, loaderVisitor, exceptionHandler); - - BrooklynMementoRawData result = builder.build(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded rebind raw data; took {}; {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{ - Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntities().size(), - result.getLocations().size(), result.getPolicies().size(), result.getEnrichers().size(), - result.getFeeds().size(), result.getCatalogItems().size(), - objectStore.getSummaryName() }); - } - - return result; - } - - @Override - public BrooklynMementoManifest loadMementoManifest(BrooklynMementoRawData mementoData, final RebindExceptionHandler exceptionHandler) throws IOException { - if (mementoData==null) - mementoData = loadMementoRawData(exceptionHandler); - - final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder(); - - Visitor visitor = new Visitor() { - @Override - public void visit(BrooklynObjectType type, String objectId, final String contents) throws Exception { - final String prefix = "/"+type.toCamelCase()+"/"; - - class XPathHelper { - private String get(String innerPath) { - return (String) XmlUtil.xpath(contents, prefix+innerPath); - } - } - XPathHelper x = new XPathHelper(); - - switch (type) { - case ENTITY: - builder.entity(x.get("id"), x.get("type"), - Strings.emptyToNull(x.get("parent")), Strings.emptyToNull(x.get("catalogItemId"))); - break; - case LOCATION: - case POLICY: - case ENRICHER: - case FEED: - builder.putType(type, x.get("id"), x.get("type")); - break; - case CATALOG_ITEM: - try { - CatalogItemMemento memento = (CatalogItemMemento) getSerializerWithStandardClassLoader().fromString(contents); - if (memento == null) { - LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing"); - } else { - builder.catalogItem(memento); - } - } catch (Exception e) { - exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" early catalog deserialization error", e); - } - break; - default: - throw new IllegalStateException("Unexpected brooklyn type: "+type); - } - } - }; - - Stopwatch stopwatch = Stopwatch.createStarted(); - - visitMemento("manifests", mementoData, visitor, exceptionHandler); - - BrooklynMementoManifest result = builder.build(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded rebind manifests; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{ - Time.makeTimeStringRounded(stopwatch), - result.getEntityIdToManifest().size(), result.getLocationIdToType().size(), - result.getPolicyIdToType().size(), result.getEnricherIdToType().size(), result.getFeedIdToType().size(), - result.getCatalogItemMementos().size(), - objectStore.getSummaryName() }); - } - - return result; - } - - @Override - public BrooklynMemento loadMemento(BrooklynMementoRawData mementoData, final LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException { - if (mementoData==null) - mementoData = loadMementoRawData(exceptionHandler); - - Stopwatch stopwatch = Stopwatch.createStarted(); - - final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder(); - - Visitor visitor = new Visitor() { - @Override - public void visit(BrooklynObjectType type, String objectId, String contents) throws Exception { - try { - Memento memento = (Memento) getSerializerWithCustomClassLoader(lookupContext, type, objectId).fromString(contents); - if (memento == null) { - LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing"); - } else { - builder.memento(memento); - } - } catch (Exception e) { - exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" deserialization error", e); - } - } - - }; - - // TODO not convinced this is single threaded on reads; maybe should get a new one each time? - getSerializerWithStandardClassLoader().setLookupContext(lookupContext); - try { - visitMemento("deserialization", mementoData, visitor, exceptionHandler); - } finally { - getSerializerWithStandardClassLoader().unsetLookupContext(); - } - - BrooklynMemento result = builder.build(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded rebind mementos; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{ - Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(), - result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(), - result.getFeedIds().size(), result.getCatalogItemIds().size(), - objectStore.getSummaryName() }); - } - - return result; - } - - protected interface Visitor { - public void visit(BrooklynObjectType type, String id, String contents) throws Exception; - } - - protected void visitMemento(final String phase, final BrooklynMementoRawData rawData, final Visitor visitor, final RebindExceptionHandler exceptionHandler) { - List<ListenableFuture<?>> futures = Lists.newArrayList(); - - class VisitorWrapper implements Runnable { - private final BrooklynObjectType type; - private final Map.Entry<String,String> objectIdAndData; - public VisitorWrapper(BrooklynObjectType type, Map.Entry<String,String> objectIdAndData) { - this.type = type; - this.objectIdAndData = objectIdAndData; - } - public void run() { - try { - visitor.visit(type, objectIdAndData.getKey(), objectIdAndData.getValue()); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - exceptionHandler.onLoadMementoFailed(type, "memento "+objectIdAndData.getKey()+" "+phase+" error", e); - } - } - } - - for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { - for (final Map.Entry<String,String> entry : rawData.getObjectsOfType(type).entrySet()) { - futures.add(executor.submit(new VisitorWrapper(type, entry))); - } - } - - try { - // Wait for all, failing fast if any exceptions. - Futures.allAsList(futures).get(); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - - List<Exception> exceptions = Lists.newArrayList(); - - for (ListenableFuture<?> future : futures) { - if (future.isDone()) { - try { - future.get(); - } catch (InterruptedException e2) { - throw Exceptions.propagate(e2); - } catch (ExecutionException e2) { - LOG.warn("Problem loading memento ("+phase+"): "+e2, e2); - exceptions.add(e2); - } - future.cancel(true); - } - } - if (exceptions.isEmpty()) { - throw Exceptions.propagate(e); - } else { - // Normally there should be at lesat one failure; otherwise all.get() would not have failed. - throw new CompoundRuntimeException("Problem loading mementos ("+phase+")", exceptions); - } - } - } - - protected void checkWritesAllowed() { - if (!writesAllowed && !writesShuttingDown) { - throw new IllegalStateException("Writes not allowed in "+this); - } - } - - /** See {@link BrooklynPersistenceUtils} for conveniences for using this method. */ - @Override - @Beta - public void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler exceptionHandler) { - checkWritesAllowed(); - try { - lock.writeLock().lockInterruptibly(); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - - try { - objectStore.prepareForMasterUse(); - - Stopwatch stopwatch = Stopwatch.createStarted(); - List<ListenableFuture<?>> futures = Lists.newArrayList(); - - for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { - for (Map.Entry<String, String> entry : newMemento.getObjectsOfType(type).entrySet()) { - futures.add(asyncPersist(type.getSubPathName(), type, entry.getKey(), entry.getValue(), exceptionHandler)); - } - } - - try { - // Wait for all the tasks to complete or fail, rather than aborting on the first failure. - // But then propagate failure if any fail. (hence the two calls). - Futures.successfulAsList(futures).get(); - Futures.allAsList(futures).get(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch)); - } finally { - lock.writeLock().unlock(); - } - } - - @Override - public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) { - checkWritesAllowed(); - - while (!queuedDeltas.isEmpty()) { - Delta extraDelta = queuedDeltas.remove(0); - doDelta(extraDelta, exceptionHandler, true); - } - - doDelta(delta, exceptionHandler, false); - } - - protected void doDelta(Delta delta, PersistenceExceptionHandler exceptionHandler, boolean previouslyQueued) { - Stopwatch stopwatch = deltaImpl(delta, exceptionHandler); - - if (LOG.isDebugEnabled()) LOG.debug("Checkpointed "+(previouslyQueued ? "previously queued " : "")+"delta of memento in {}: " - + "updated {} entities, {} locations, {} policies, {} enrichers, {} catalog items; " - + "removed {} entities, {} locations, {} policies, {} enrichers, {} catalog items", - new Object[] {Time.makeTimeStringRounded(stopwatch), - delta.entities().size(), delta.locations().size(), delta.policies().size(), delta.enrichers().size(), delta.catalogItems().size(), - delta.removedEntityIds().size(), delta.removedLocationIds().size(), delta.removedPolicyIds().size(), delta.removedEnricherIds().size(), delta.removedCatalogItemIds().size()}); - } - - @Override - public void queueDelta(Delta delta) { - queuedDeltas.add(delta); - } - - /** - * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy"). - * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time. - * - * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we - * don't do unnecessary repeated writes of an entity. - */ - private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) { - try { - lock.writeLock().lockInterruptibly(); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - try { - objectStore.prepareForMasterUse(); - - Stopwatch stopwatch = Stopwatch.createStarted(); - List<ListenableFuture<?>> futures = Lists.newArrayList(); - - for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { - for (Memento entity : delta.getObjectsOfType(type)) { - futures.add(asyncPersist(type.getSubPathName(), entity, exceptionHandler)); - } - } - for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) { - for (String id : delta.getRemovedIdsOfType(type)) { - futures.add(asyncDelete(type.getSubPathName(), id, exceptionHandler)); - } - } - - try { - // Wait for all the tasks to complete or fail, rather than aborting on the first failure. - // But then propagate failure if any fail. (hence the two calls). - Futures.successfulAsList(futures).get(); - Futures.allAsList(futures).get(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - - return stopwatch; - } finally { - lock.writeLock().unlock(); - } - } - - @Override - public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException { - boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS); - if (locked) { - ImmutableSet<StoreObjectAccessorWithLock> wc; - synchronized (writers) { - wc = ImmutableSet.copyOf(writers.values()); - } - lock.readLock().unlock(); - - // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because - // each writer is now synchronous. - for (StoreObjectAccessorWithLock writer : wc) { - writer.waitForCurrentWrites(timeout); - } - } else { - throw new TimeoutException("Timeout waiting for writes to "+objectStore); - } - } - - private String read(String subPath) { - StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath); - return objectAccessor.get(); - } - - private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) { - try { - getWriter(getPath(subPath, memento.getId())).put(getSerializerWithStandardClassLoader().toString(memento)); - } catch (Exception e) { - exceptionHandler.onPersistMementoFailed(memento, e); - } - } - - private void persist(String subPath, BrooklynObjectType type, String id, String content, PersistenceExceptionHandler exceptionHandler) { - try { - if (content==null) { - LOG.warn("Null content for "+type+" "+id); - } - getWriter(getPath(subPath, id)).put(content); - } catch (Exception e) { - exceptionHandler.onPersistRawMementoFailed(type, id, e); - } - } - - private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) { - try { - StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id)); - w.delete(); - synchronized (writers) { - writers.remove(id); - } - } catch (Exception e) { - exceptionHandler.onDeleteMementoFailed(id, e); - } - } - - private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) { - return executor.submit(new Runnable() { - public void run() { - persist(subPath, memento, exceptionHandler); - }}); - } - - private ListenableFuture<?> asyncPersist(final String subPath, final BrooklynObjectType type, final String id, final String content, final PersistenceExceptionHandler exceptionHandler) { - return executor.submit(new Runnable() { - public void run() { - persist(subPath, type, id, content, exceptionHandler); - }}); - } - - private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) { - return executor.submit(new Runnable() { - public void run() { - delete(subPath, id, exceptionHandler); - }}); - } - - private String getPath(String subPath, String id) { - return subPath+"/"+Strings.makeValidFilename(id); - } - - @Override - public String getBackingStoreDescription() { - return getObjectStore().getSummaryName(); - } -}
