This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 84410a8275f60701347fdd3e5dcef44d72d93104 Author: Alex Heneveld <[email protected]> AuthorDate: Thu Oct 21 21:23:48 2021 +0100 improve task garbage collection, using task name also previously it looked only in task tags to determine which tasks to remember after completion; now it also looks at the task's _name_ and only keeps 10 tasks per entity with a given name. useful for scheduled tasks where otherwise we might keep lots, if they don't have distinguishing tags. --- .../mgmt/internal/BrooklynGarbageCollector.java | 121 ++++++++++--- .../mgmt/internal/EntityExecutionManagerTest.java | 189 ++++++++++++++++++--- 2 files changed, 265 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java index f780b5c..5c6f2b7 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java @@ -40,6 +40,7 @@ import java.util.stream.Collectors; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.mgmt.HasTaskChildren; +import org.apache.brooklyn.api.mgmt.ManagementContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; @@ -133,7 +134,12 @@ public class BrooklynGarbageCollector { + "within an execution context (e.g. entity); " + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full", 50); - + + public static final ConfigKey<Integer> MAX_TASKS_PER_NAME = ConfigKeys.newIntegerConfigKey( + "brooklyn.gc.maxTasksPerName", + "the maximum number of tasks with the same name kept within an execution context (e.g. entity)", + 10); + public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey( "brooklyn.gc.maxTasksPerEntity", "the maximum number of tasks to be kept for a given entity", @@ -168,7 +174,7 @@ public class BrooklynGarbageCollector { private Duration gcPeriod; private volatile boolean running = true; - + public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) { this.executionManager = executionManager; this.storage = storage; @@ -330,8 +336,16 @@ public class BrooklynGarbageCollector { } /** - * Deletes old tasks. The age/number of tasks to keep is controlled by fields like + * Deletes old tasks. The age/number of tasks to keep is controlled by fields including * {@link #MAX_TASKS_PER_TAG} and {@link #MAX_TASKS_PER_TAG}. + * + * This works by looking at the "entity" tag(s) [context and target], then at the "non-entity" tags (excluding some such as sub-task etc); + * any (completed) task which has one or more tag in the category, and where all such tags are over capacity + * (with some grace to ignore tasks for which one tag in category is under-capacity), these will get GC'd; + * with oldest first. So it will keep up to 1000 tasks for an entity, and limit of up to 50 for each tag, + * so eg attaching an 'entityId:effectorName' tag means we keep up to 50 instances of each effector call, provided we don't exceed the 1000 global. + * + * (It might be nicer to score, based on age and name uniqueness and activity within an entity. But above works pretty well.) */ @VisibleForTesting public synchronized int gcTasks() { @@ -356,10 +370,13 @@ public class BrooklynGarbageCollector { expireAgedTasks(); expireTransientTasks(); - // now look at overcapacity tags, non-entity tags first - + // now look at overcapacity tags, names, then non-entity tags first + Set<Object> taskTags = executionManager.getTaskTags(); - + + int deletedCount = 0; + deletedCount += expireOverCapacityNamesInCategory(taskTags, TagCategory.ENTITY); + int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY); int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG); @@ -392,10 +409,9 @@ public class BrooklynGarbageCollector { } } - int deletedCount = 0; deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false); deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true); - + // if expensive we could optimize task GC here to avoid repeated lookups by // counting all expired above (not just prev two lines) and skipping if none // but that seems unlikely @@ -416,14 +432,15 @@ public class BrooklynGarbageCollector { protected static boolean isTagIgnoredForGc(Object tag) { if (tag == null) return true; + if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true; if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true; if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true; if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true; - if (tag instanceof WrappedStream) { - return true; - } - + + if (tag instanceof ManagementContext) return true; + if (tag instanceof WrappedStream) return true; + return false; } @@ -550,7 +567,7 @@ public class BrooklynGarbageCollector { } - /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */ + /** expires tasks which are over-capacity in all their non-entity or entity (target, context) tag categories, returned count */ protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) { if (emptyFilterNeeded) { // previous run may have decremented counts @@ -629,25 +646,29 @@ public class BrooklynGarbageCollector { LOG.debug("brooklyn-gc detected " + taskTagsInCategoryOverCapacity.size() + " " + category + " " + "tag(s) over capacity, expiring old tasks; " + tasksToConsiderDeleting.size() + " tasks under consideration; categories are: " - + taskTagsInCategoryOverCapacity + "; including " + tasksToConsiderDeleting); + + taskTagsInCategoryOverCapacity + "; including " + tasksToLog); } // now try deleting tasks which are overcapacity for each (non-entity) tag - int deleted = 0; + Set<Task<?>> deleted = MutableSet.of(); for (Task<?> task: tasksToConsiderDeleting) { - boolean delete = true; + boolean delete = false; for (Object tag: task.getTags()) { - if (!category.acceptsTag(tag)) + if (!category.acceptsTag(tag)) { + // ignore this tag, not right for the category continue; + } if (taskTagsInCategoryOverCapacity.get(tag)==null) { // no longer over capacity in this tag delete = false; break; } + // has at least one tag in the category, and all such tags are overcapacity + delete = true; } if (delete) { // delete this and update overcapacity info - deleted++; + deleted.add(task); executionManager.deleteTask(task); for (Object tag: task.getTags()) { AtomicInteger counter = taskAllTagsOverCapacity.get(tag); @@ -662,9 +683,67 @@ public class BrooklynGarbageCollector { } if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; " - + "capacities now: " + taskTagsInCategoryOverCapacity); - return deleted; + LOG.debug("brooklyn-gc deleted "+deleted.size()+" tasks in over-capacity " + category+" tag categories; " + + "capacities now: " + taskTagsInCategoryOverCapacity+"; deleted tasks: "+ + deleted.stream().map(Task::getId).collect(Collectors.joining(","))); + return deleted.size(); + } + + protected int expireOverCapacityNamesInCategory(Set<Object> taskTags, TagCategory category) { + List<Object> entityTags = taskTags.stream().filter(tag -> category.acceptsTag(tag)).collect(Collectors.toList()); + Integer maxPerName = brooklynProperties.getConfig(MAX_TASKS_PER_NAME); + if (maxPerName==null || maxPerName<=0) return 0; + Set<Task<?>> tasksToDelete = MutableSet.of(); + + try { + for (Object entityTag: entityTags) { + Set<Task<?>> tasks = executionManager.getTasksWithTag(entityTag); + Map<String,Set<Task<?>>> tasksByName = MutableMap.of(); + for (Task<?> task: tasks) { + if (!task.isDone(true)) continue; + tasksByName.compute(task.getDisplayName(), (key,set) -> { + if (set==null) set = MutableSet.of(); + set.add(task); + return set; + }); + } + + List<Entry<String,Set<Task<?>>>> overCapacityNames = tasksByName.entrySet().stream().filter(entry -> entry.getValue().size() > maxPerName).collect(Collectors.toList()); + if (!overCapacityNames.isEmpty()) { + LOG.debug("brooklyn-gc detected tasks exceeding max per-name for entity "+entityTag+"; collecting for deletion: " + + overCapacityNames.stream().map(entry -> entry.getKey()+"("+entry.getValue().size()+")").collect(Collectors.joining(", "))); + } + overCapacityNames.forEach(entry -> { + List<Task<?>> list = MutableList.copyOf(entry.getValue()); + Collections.sort(list, TASKS_NEWEST_FIRST_COMPARATOR); + list.stream().skip(maxPerName).forEach(tasksToDelete::add); + }); + } + + } catch (ConcurrentModificationException e) { + // do CME's happen with these data structures? + // if so, let's just delete what we've found so far + LOG.debug("Got CME inspecting tasks by name to delete; ignoring: "+e); + } + + + if (tasksToDelete.isEmpty()) { + return 0; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("brooklyn-gc detected " + + tasksToDelete.size() + " tasks with exceeding max per-name and will be deleted: "+ + tasksToDelete.stream().map(Task::getId).collect(Collectors.joining(","))); + } + + // now try deleting tasks which are overcapacity for each (non-entity) tag + for (Task<?> task: tasksToDelete) { + // delete this and update overcapacity info + executionManager.deleteTask(task); + } + + return tasksToDelete.size(); } protected int expireIfOverCapacityGlobally() { diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java index 6168942..8ab3ead 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/EntityExecutionManagerTest.java @@ -18,6 +18,11 @@ */ package org.apache.brooklyn.core.mgmt.internal; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.brooklyn.core.entity.Dumper; +import org.apache.brooklyn.feed.function.FunctionFeed; +import org.apache.brooklyn.feed.function.FunctionPollConfig; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -146,29 +151,38 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { }}); } + static Set<String> SYSTEM_TASK_WORDS = ImmutableSet.of("initialize model", "entity init", "management start"); + static Set<Task<?>> removeSystemTasks(Iterable<Task<?>> tasks) { Set<Task<?>> result = MutableSet.of(); for (Task<?> t: tasks) { if (t instanceof ScheduledTask) continue; if (t.getTags().contains(BrooklynTaskTags.SENSOR_TAG)) continue; - if (t.getDisplayName().contains("Validating")) continue; + if (SYSTEM_TASK_WORDS.stream().anyMatch(t.getDisplayName().toLowerCase()::contains)) continue; result.add(t); } return result; } // Needed because of https://issues.apache.org/jira/browse/BROOKLYN-401 - protected void assertTaskMaxCountForEntityEventually(final Entity entity, final int expectedMaxCount) { + protected void assertNonSystemTaskCountForEntityEventuallyEquals(final Entity entity, final int expectedCount) { + assertNonSystemTaskCountForEntityEventuallyIsInRange(entity, expectedCount, expectedCount); + } + + protected void assertNonSystemTaskCountForEntityEventuallyIsInRange(final Entity entity, final int expectedMinCount, final int expectedMaxCount) { // Dead task (and initialization task) should have been GC'd on completion. // However, the GC'ing happens in a listener, executed in a different thread - the task.get() // doesn't block for it. Therefore can't always guarantee it will be GC'ed by now. - Asserts.succeedsEventually(new Runnable() { + Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.seconds(3)), new Runnable() { @Override public void run() { forceGc(); Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) ); - Assert.assertTrue(tasks.size() <= expectedMaxCount, - "Expected tasks count max of " + expectedMaxCount + ". Tasks were "+tasks); + Assert.assertTrue(tasks.size() >= expectedMinCount && tasks.size() <= expectedMaxCount, + "Expected tasks count [" + expectedMinCount+","+expectedMaxCount + "]. Tasks were:\n"+tasks.stream().map(t -> ""+t+": "+t.getTags()+"\n").collect(Collectors.joining())); }}); + + Collection<Task<?>> tasks = removeSystemTasks( BrooklynTaskTags.getTasksInEntityContext(((EntityInternal)entity).getManagementContext().getExecutionManager(), entity) ); + LOG.info("Expected tasks count [" + expectedMinCount+","+expectedMaxCount + "] satisfied; tasks were:\n"+tasks.stream().map(t -> ""+t+": "+t.getTags()+"\n").collect(Collectors.joining())); } public void testGetTasksAndGcBoringTags() throws Exception { @@ -200,7 +214,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { stopCondition.set(true); - assertTaskMaxCountForEntityEventually(e, 2); + assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2); } public void testGcTaskAtEntityLimit() throws Exception { @@ -224,15 +238,24 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { forceGc(); stopCondition.set(true); - assertTaskMaxCountForEntityEventually(app, 2); - assertTaskMaxCountForEntityEventually(e, 2); + assertNonSystemTaskCountForEntityEventuallyIsInRange(app, 0, 2); + assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2); + + for (int count=0; count<5; count++) + runEmptyTaskWithNameAndTags(e, "task-e-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + for (int count=0; count<5; count++) + runEmptyTaskWithNameAndTags(app, "task-app-"+count, ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag"); + + forceGc(); + assertNonSystemTaskCountForEntityEventuallyEquals(app, 2); + assertNonSystemTaskCountForEntityEventuallyEquals(e, 2); } public void testGcTaskWithTagAndEntityLimit() throws Exception { TestEntity e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); ((BrooklynProperties)app.getManagementContext().getConfig()).put( - BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 6); + BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 8); ((BrooklynProperties)app.getManagementContext().getConfig()).put( BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); @@ -254,13 +277,17 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e"); runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "boring-tag", "another-tag-e"); // should keep both the above - + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag"); runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag"); + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-e"); + runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-e"); Time.sleep(Duration.ONE_MILLISECOND); runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag"); - // should keep the below since they have unique tags, but remove one of the e tasks above + + // should keep the below since they have unique tags, plus 4 to 6 of the above, depending which of boring-tags are kept, but might remove 1 of the above runEmptyTaskWithNameAndTags(e, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag", "and-another-tag"); + runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag"); runEmptyTaskWithNameAndTags(app, "task-"+(count++), ManagementContextInternal.NON_TRANSIENT_TASK_TAG, "another-tag-app", "another-tag"); @@ -268,36 +295,54 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { forceGc(); stopCondition.set(true); - assertTaskMaxCountForEntityEventually(e, 6); - assertTaskMaxCountForEntityEventually(app, 3); - + assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 4, 7); + assertNonSystemTaskCountForEntityEventuallyIsInRange(app, 2, 3); + // now with a lowered limit, we should remove one more e ((BrooklynProperties)app.getManagementContext().getConfig()).put( BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, 5); - assertTaskMaxCountForEntityEventually(e, 5); + forceGc(); + assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 4, 5); } public void testGcDynamicTaskAtNormalTagLimit() throws Exception { + TestEntity e = doTestGcDynamicTaskAtNormalTagLimit(false); + // can go to zero if just one tag, shared by the transient flooding tasks + assertNonSystemTaskCountForEntityEventuallyIsInRange(e, 0, 2); + } + + public void testGcDynamicTaskAtNormalTagLimitWithExtraTag() throws Exception { + TestEntity e = doTestGcDynamicTaskAtNormalTagLimit(true); + // should keep two of our task-N tasks if that has a unique tag + assertNonSystemTaskCountForEntityEventuallyEquals(e, 2); + } + + public TestEntity doTestGcDynamicTaskAtNormalTagLimit(boolean addExtraTag) throws Exception { TestEntity e = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - - ((BrooklynProperties)app.getManagementContext().getConfig()).put( - BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); + + ((BrooklynProperties) app.getManagementContext().getConfig()).put( + BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); AtomicBoolean stopCondition = new AtomicBoolean(); scheduleRecursiveTemporaryTask(stopCondition, e, "foo"); scheduleRecursiveTemporaryTask(stopCondition, e, "foo"); - for (int count=0; count<5; count++) { - TaskBuilder<Object> tb = Tasks.builder().displayName("task-"+count).dynamic(true).body(new Runnable() { @Override public void run() {}}) - .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo"); - ((EntityInternal)e).getExecutionContext().submit(tb.build()).getUnchecked(); + for (int count = 0; count < 5; count++) { + TaskBuilder<Object> tb = Tasks.builder().displayName("task-" + count).dynamic(true).body(new Runnable() { + @Override + public void run() { + } + }) + .tag(ManagementContextInternal.NON_TRANSIENT_TASK_TAG).tag("foo"); + if (addExtraTag) tb.tag("bar"); + ((EntityInternal) e).getExecutionContext().submit(tb.build()).getUnchecked(); } // Makes sure there's a GC while the transient tasks are running forceGc(); stopCondition.set(true); - assertTaskMaxCountForEntityEventually(e, 2); + return e; } public void testUnmanagedEntityCanBeGcedEvenIfPreviouslyTagged() throws Exception { @@ -426,7 +471,7 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { int maxNumTasks = 2; BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.ONE_SECOND); - brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, 2); + brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, maxNumTasks); replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties)); setUpApp(); @@ -467,6 +512,102 @@ public class EntityExecutionManagerTest extends BrooklynAppUnitTestSupport { }}); } + static class IncrementingCallable implements Callable<Integer> { + private final AtomicInteger next = new AtomicInteger(0); + + @Override public Integer call() { + return next.getAndIncrement(); + } + } + + @Test(groups={"Integration"}) + public void testEffectorTasksTwoEntitiesPreferByName() throws Exception { + int maxNumTasksPerName = 4; + int maxNumTasksPerTag = 5; + int maxNumTasksPerEntity = 15; // no more than 5 of each + BrooklynProperties brooklynProperties = BrooklynProperties.Factory.newEmpty(); + brooklynProperties.put(BrooklynGarbageCollector.GC_PERIOD, Duration.seconds(3)); + brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_TAG, maxNumTasksPerTag); + brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_ENTITY, maxNumTasksPerEntity); + brooklynProperties.put(BrooklynGarbageCollector.MAX_TASKS_PER_NAME, maxNumTasksPerName); + + replaceManagementContext(LocalManagementContextForTests.newInstance(brooklynProperties)); + setUpApp(); + final TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + + List<Task<?>> tasks = Lists.newArrayList(); + + Set<Task<?>> storedTasks1 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() ); + String storedTasks1Str = storedTasks1.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n")); + LOG.info("TASKS BEFORE RUN:\n"+storedTasks1Str); + + FunctionFeed feed = FunctionFeed.builder() + .entity(entity) + .poll(new FunctionPollConfig<Integer, Integer>(TestEntity.SEQUENCE) + .period(Duration.millis(20)) + .callable(new IncrementingCallable()) + //.onSuccess((Function<Object,Integer>)(Function)Functions.identity())) + ) + .build(); + + for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) { + Task<?> task = entity.invoke(TestEntity.MY_EFFECTOR, ImmutableMap.<String,Object>of()); + task.get(); + tasks.add(task); + // see testEffectorTasksGcedForMaxPerTag + Thread.sleep(10); + } + + for (int i = 0; i < (4*maxNumTasksPerEntity+1); i++) { + Task<?> task = entity.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.<String,Object>of("arg", "id-"+i)); + task.get(); + tasks.add(task); + entity2.invoke(TestEntity.IDENTITY_EFFECTOR, ImmutableMap.<String,Object>of("arg", "id-"+i)); + Thread.sleep(10); + } + + // and add some context-only (the above have a target entity, so don't interfere with the below): + + for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) { + entity.getExecutionContext().submit( + Tasks.fail("failure-flood-1", null)).blockUntilEnded(); + } + // normally flood-2 will remove the flood 1 + for (int i = 0; i < (2*maxNumTasksPerEntity+1); i++) { + entity.getExecutionContext().submit( + Tasks.fail("failure-flood-2", null)).blockUntilEnded(); + } + + Dumper.dumpInfo(app); + + // Should initially have all tasks + feed.stop(); + + + // oldest should be GC'ed to leave only maxNumTasks + Set<Task<?>> storedTasks2 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() ); + String storedTasks2Str = storedTasks2.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n")); + LOG.info("TASKS AFTER RUN:\n"+storedTasks2Str); + + ((LocalManagementContext)mgmt).getGarbageCollector().gcIteration(); + + Set<Task<?>> storedTasks3 = app.getManagementContext().getExecutionManager().getTasksWithAnyTag( app.getManagementContext().getExecutionManager().getTaskTags() ); + String storedTasks3Str = storedTasks3.stream().map(EntityExecutionManagerTest.this::taskToVerboseString).collect(Collectors.joining("\n")); + LOG.info("TASKS AFTER GC:\n"+storedTasks3Str); + + assertTrue(!storedTasks3.containsAll(storedTasks2), "some tasks should have been GC'd"); + assertTrue(storedTasks3.size() <= maxNumTasksPerEntity*2 /* number of TestEntity instances */ *2 /* target and context */ + 10 /* grace for tasks on the app root node */, "too many tasks: "+storedTasks3.size()); + // and should keep some in each category + Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity.getId()+":myEffector")).count(), n -> n==maxNumTasksPerName); + Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity.getId()+":identityEffector")).count(), n -> n==maxNumTasksPerName); + Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("EFFECTOR@"+entity2.getId()+":identityEffector")).count(), n -> n==maxNumTasksPerName); + Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("test.sequence")).count(), n -> n>0 && n<=maxNumTasksPerName + 3); // might be still running + Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("failure-flood-1")).count(), n -> n==maxNumTasksPerName); + Asserts.assertThat(storedTasks3.stream().filter(t -> taskToVerboseString(t).contains("failure-flood-2")).count(), n -> n==maxNumTasksPerName); + } + + private String taskToVerboseString(Task<?> t) { return MoreObjects.toStringHelper(t) .add("id", t.getId())
