delete catalog items from persistence store on reset

Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/48ce0df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/48ce0df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/48ce0df8

Branch: refs/heads/master
Commit: 48ce0df851b34e0f313178cb6c4a9dc582e88f13
Parents: ddbb43c
Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com>
Authored: Wed May 6 17:49:56 2015 +0100
Committer: Alex Heneveld <alex.henev...@cloudsoftcorp.com>
Committed: Fri May 8 18:22:22 2015 +0100

----------------------------------------------------------------------
 .../mementos/BrooklynMementoPersister.java       |  7 +++++--
 .../mementos/BrooklynMementoRawData.java         |  6 ++++++
 .../brooklyn/entity/rebind/RebindIteration.java  |  9 ++++++++-
 .../AbstractBrooklynMementoPersister.java        | 12 +++++++++++-
 .../BrooklynMementoPersisterToMultiFile.java     |  6 +++++-
 .../BrooklynMementoPersisterToObjectStore.java   | 19 ++++++++++++++++++-
 6 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java 
b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
index a9eb65f..33f1de7 100644
--- a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
@@ -97,10 +97,13 @@ public interface BrooklynMementoPersister {
     /** @deprecated since 0.7.0, use {@link 
#checkpoint(BrooklynMementoRawData, PersistenceExceptionHandler)} 
      * and javadoc on implementations of that */ @Deprecated  // pretty sure 
this is not used outwith deprecated code
     void checkpoint(BrooklynMemento memento, PersistenceExceptionHandler 
exceptionHandler);
-    
-    void checkpoint(BrooklynMementoRawData newMemento, 
PersistenceExceptionHandler exceptionHandler);
 
+    /** applies a full checkpoint (write) of all state */  
+    void checkpoint(BrooklynMementoRawData newMemento, 
PersistenceExceptionHandler exceptionHandler);
+    /** applies a partial write of state delta */  
     void delta(Delta delta, PersistenceExceptionHandler exceptionHandler);
+    /** inserts an additional delta to be written on the next delta request */
+    void queueDelta(Delta delta);
 
     void enableWriteAccess();
     void disableWriteAccess(boolean graceful);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java 
b/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
index 32c7232..049b0c2 100644
--- a/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
@@ -160,6 +160,12 @@ public class BrooklynMementoRawData {
         return Collections.unmodifiableMap(catalogItems);
     }
     
+    // to handle reset catalog
+    @Beta
+    public void clearCatalogItems() {
+        catalogItems.clear();
+    }
+    
     public boolean isEmpty() {
         return entities.isEmpty() && locations.isEmpty() && policies.isEmpty() 
&& enrichers.isEmpty() && feeds.isEmpty() && catalogItems.isEmpty();
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java 
b/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java
index 3355851..136cb5b 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindIteration.java
@@ -363,7 +363,14 @@ public abstract class RebindIteration {
                 logRebindingDebug(message);
 
                 itemsForResettingCatalog = MutableList.<CatalogItem<?,?>>of();
-                // TODO destroy persisted items
+                
+                PersisterDeltaImpl delta = new PersisterDeltaImpl();
+                for (String catalogItemId: 
mementoRawData.getCatalogItems().keySet()) {
+                    delta.removedCatalogItemIds.add(catalogItemId);
+                }
+                getPersister().queueDelta(delta);
+                
+                mementoRawData.clearCatalogItems();
                 needsInitialCatalog = true;
             } else {
                 if (!isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
 
b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
index d296e35..d3f85df 100644
--- 
a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
+++ 
b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
@@ -19,6 +19,10 @@
 package brooklyn.entity.rebind.persister;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import brooklyn.entity.rebind.PersistenceExceptionHandler;
 import brooklyn.entity.rebind.RebindExceptionHandler;
 import brooklyn.entity.rebind.dto.BrooklynMementoManifestImpl;
@@ -39,6 +43,8 @@ import brooklyn.mementos.PolicyMemento;
 @Deprecated
 public abstract class AbstractBrooklynMementoPersister implements 
BrooklynMementoPersister {
 
+    private static final Logger log = 
LoggerFactory.getLogger(AbstractBrooklynMementoPersister.class);
+    
     protected volatile MutableBrooklynMemento memento = new 
MutableBrooklynMemento();
     
     @Override
@@ -96,7 +102,6 @@ public abstract class AbstractBrooklynMementoPersister 
implements BrooklynMement
         throw new IllegalStateException("Not supported; use 
"+BrooklynMementoPersisterToObjectStore.class);
     }
     
-
     @Override
     public void delta(Delta delta, PersistenceExceptionHandler 
exceptionHanlder) {
         memento.removeEntities(delta.removedEntityIds());
@@ -110,6 +115,11 @@ public abstract class AbstractBrooklynMementoPersister 
implements BrooklynMement
         memento.updateEnricherMementos(delta.enrichers());
         memento.updateCatalogItemMementos(delta.catalogItems());
     }
+
+    @Override
+    public void queueDelta(Delta delta) {
+        log.warn("Legacy persister ignoring queued delta: "+delta);
+    }
     
     @Override
     public String getBackingStoreDescription() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
 
b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
index d3318b3..afb3d61 100644
--- 
a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
+++ 
b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
@@ -47,7 +47,6 @@ import brooklyn.mementos.EntityMemento;
 import brooklyn.mementos.LocationMemento;
 import brooklyn.mementos.PolicyMemento;
 import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.io.FileUtil;
 import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
@@ -441,6 +440,11 @@ public class BrooklynMementoPersisterToMultiFile 
implements BrooklynMementoPersi
         }
     }
 
+    @Override
+    public void queueDelta(Delta delta) {
+        LOG.warn("Legacy persister ignoring queued delta: "+delta);
+    }
+    
     @VisibleForTesting
     public File getDir() {
         return dir;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/48ce0df8/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
 
b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 81b2cc6..47eefdc 100644
--- 
a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ 
b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -110,6 +111,8 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
     private volatile boolean writesShuttingDown = false;
     private StringConfigMap brooklynProperties;
     
+    private List<Delta> queuedDeltas = new 
CopyOnWriteArrayList<BrooklynMementoPersister.Delta>();
+    
     /**
      * Lock used on writes (checkpoint + delta) so that {@link 
#waitForWritesCompleted(Duration)} can block
      * for any concurrent call to complete.
@@ -560,9 +563,18 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
     public void delta(Delta delta, PersistenceExceptionHandler 
exceptionHandler) {
         checkWritesAllowed();
 
+        while (!queuedDeltas.isEmpty()) {
+            Delta extraDelta = queuedDeltas.remove(0);
+            doDelta(extraDelta, exceptionHandler, false);
+        }
+
+        doDelta(delta, exceptionHandler, false);
+    }
+    
+    protected void doDelta(Delta delta, PersistenceExceptionHandler 
exceptionHandler, boolean previouslyQueued) {
         Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
         
-        if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of memento in 
{}: "
+        if (LOG.isDebugEnabled()) LOG.debug("Checkpointed "+(previouslyQueued 
? "previously queued " : "")+"delta of memento in {}: "
                 + "updated {} entities, {} locations, {} policies, {} 
enrichers, {} catalog items; "
                 + "removed {} entities, {} locations, {} policies, {} 
enrichers, {} catalog items",
                     new Object[] {Time.makeTimeStringRounded(stopwatch),
@@ -570,6 +582,11 @@ public class BrooklynMementoPersisterToObjectStore 
implements BrooklynMementoPer
                         delta.removedEntityIds().size(), 
delta.removedLocationIds().size(), delta.removedPolicyIds().size(), 
delta.removedEnricherIds().size(), delta.removedCatalogItemIds().size()});
     }
     
+    @Override
+    public void queueDelta(Delta delta) {
+        queuedDeltas.add(delta);
+    }
+    
     /**
      * Concurrent calls will queue-up (the lock is "fair", which means an 
"approximately arrival-order policy").
      * Current usage is with the {@link PeriodicDeltaChangeListener} so we 
expect only one call at a time.

Reply via email to