delete catalog items from persistence store on reset
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/48ce0df8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/48ce0df8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/48ce0df8 Branch: refs/heads/master Commit: 48ce0df851b34e0f313178cb6c4a9dc582e88f13 Parents: ddbb43c Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com> Authored: Wed May 6 17:49:56 2015 +0100 Committer: Alex Heneveld <alex.henev...@cloudsoftcorp.com> Committed: Fri May 8 18:22:22 2015 +0100 ---------------------------------------------------------------------- .../mementos/BrooklynMementoPersister.java | 7 +++++-- .../mementos/BrooklynMementoRawData.java | 6 ++++++ .../brooklyn/entity/rebind/RebindIteration.java | 9 ++++++++- .../AbstractBrooklynMementoPersister.java | 12 +++++++++++- .../BrooklynMementoPersisterToMultiFile.java | 6 +++++- .../BrooklynMementoPersisterToObjectStore.java | 19 ++++++++++++++++++- 6 files changed, 53 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java index a9eb65f..33f1de7 100644 --- a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java +++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java @@ -97,10 +97,13 @@ public interface BrooklynMementoPersister { /** @deprecated since 0.7.0, use {@link #checkpoint(BrooklynMementoRawData, PersistenceExceptionHandler)} * and javadoc on implementations of that */ @Deprecated // pretty sure this is not used outwith deprecated code void checkpoint(BrooklynMemento memento, PersistenceExceptionHandler exceptionHandler); - - void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler exceptionHandler); + /** applies a full checkpoint (write) of all state */ + void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler exceptionHandler); + /** applies a partial write of state delta */ void delta(Delta delta, PersistenceExceptionHandler exceptionHandler); + /** inserts an additional delta to be written on the next delta request */ + void queueDelta(Delta delta); void enableWriteAccess(); void disableWriteAccess(boolean graceful); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java index 32c7232..049b0c2 100644 --- a/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java +++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java @@ -160,6 +160,12 @@ public class BrooklynMementoRawData { return Collections.unmodifiableMap(catalogItems); } + // to handle reset catalog + @Beta + public void clearCatalogItems() { + catalogItems.clear(); + } + public boolean isEmpty() { return entities.isEmpty() && locations.isEmpty() && policies.isEmpty() && enrichers.isEmpty() && feeds.isEmpty() && catalogItems.isEmpty(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java b/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java index 3355851..136cb5b 100644 --- a/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java +++ b/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java @@ -363,7 +363,14 @@ public abstract class RebindIteration { logRebindingDebug(message); itemsForResettingCatalog = MutableList.<CatalogItem<?,?>>of(); - // TODO destroy persisted items + + PersisterDeltaImpl delta = new PersisterDeltaImpl(); + for (String catalogItemId: mementoRawData.getCatalogItems().keySet()) { + delta.removedCatalogItemIds.add(catalogItemId); + } + getPersister().queueDelta(delta); + + mementoRawData.clearCatalogItems(); needsInitialCatalog = true; } else { if (!isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java index d296e35..d3f85df 100644 --- a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java +++ b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java @@ -19,6 +19,10 @@ package brooklyn.entity.rebind.persister; import static com.google.common.base.Preconditions.checkNotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import brooklyn.entity.rebind.PersistenceExceptionHandler; import brooklyn.entity.rebind.RebindExceptionHandler; import brooklyn.entity.rebind.dto.BrooklynMementoManifestImpl; @@ -39,6 +43,8 @@ import brooklyn.mementos.PolicyMemento; @Deprecated public abstract class AbstractBrooklynMementoPersister implements BrooklynMementoPersister { + private static final Logger log = LoggerFactory.getLogger(AbstractBrooklynMementoPersister.class); + protected volatile MutableBrooklynMemento memento = new MutableBrooklynMemento(); @Override @@ -96,7 +102,6 @@ public abstract class AbstractBrooklynMementoPersister implements BrooklynMement throw new IllegalStateException("Not supported; use "+BrooklynMementoPersisterToObjectStore.class); } - @Override public void delta(Delta delta, PersistenceExceptionHandler exceptionHanlder) { memento.removeEntities(delta.removedEntityIds()); @@ -110,6 +115,11 @@ public abstract class AbstractBrooklynMementoPersister implements BrooklynMement memento.updateEnricherMementos(delta.enrichers()); memento.updateCatalogItemMementos(delta.catalogItems()); } + + @Override + public void queueDelta(Delta delta) { + log.warn("Legacy persister ignoring queued delta: "+delta); + } @Override public String getBackingStoreDescription() { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java index d3318b3..afb3d61 100644 --- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java +++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java @@ -47,7 +47,6 @@ import brooklyn.mementos.EntityMemento; import brooklyn.mementos.LocationMemento; import brooklyn.mementos.PolicyMemento; import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.io.FileUtil; import brooklyn.util.text.Strings; import brooklyn.util.time.Duration; import brooklyn.util.time.Time; @@ -441,6 +440,11 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi } } + @Override + public void queueDelta(Delta delta) { + LOG.warn("Legacy persister ignoring queued delta: "+delta); + } + @VisibleForTesting public File getDir() { return dir; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java index 81b2cc6..47eefdc 100644 --- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java +++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -110,6 +111,8 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer private volatile boolean writesShuttingDown = false; private StringConfigMap brooklynProperties; + private List<Delta> queuedDeltas = new CopyOnWriteArrayList<BrooklynMementoPersister.Delta>(); + /** * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block * for any concurrent call to complete. @@ -560,9 +563,18 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) { checkWritesAllowed(); + while (!queuedDeltas.isEmpty()) { + Delta extraDelta = queuedDeltas.remove(0); + doDelta(extraDelta, exceptionHandler, false); + } + + doDelta(delta, exceptionHandler, false); + } + + protected void doDelta(Delta delta, PersistenceExceptionHandler exceptionHandler, boolean previouslyQueued) { Stopwatch stopwatch = deltaImpl(delta, exceptionHandler); - if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of memento in {}: " + if (LOG.isDebugEnabled()) LOG.debug("Checkpointed "+(previouslyQueued ? "previously queued " : "")+"delta of memento in {}: " + "updated {} entities, {} locations, {} policies, {} enrichers, {} catalog items; " + "removed {} entities, {} locations, {} policies, {} enrichers, {} catalog items", new Object[] {Time.makeTimeStringRounded(stopwatch), @@ -570,6 +582,11 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer delta.removedEntityIds().size(), delta.removedLocationIds().size(), delta.removedPolicyIds().size(), delta.removedEnricherIds().size(), delta.removedCatalogItemIds().size()}); } + @Override + public void queueDelta(Delta delta) { + queuedDeltas.add(delta); + } + /** * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy"). * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.