This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 35ca9705cb23f428e4e056c97a46eba56bc2e047
Author: Alex Heneveld <[email protected]>
AuthorDate: Thu Jun 8 15:11:15 2023 +0100

    test workflow size
---
 .../brooklyn/core/workflow/WorkflowSizeTest.java   | 133 +++++++++++++++++++++
 1 file changed, 133 insertions(+)

diff --git 
a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java 
b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java
new file mode 100644
index 0000000000..683256f0ca
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowSizeTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.workflow;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.effector.Effectors;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils;
+import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport;
+import org.apache.brooklyn.entity.stock.BasicApplication;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.FileOutputStream;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.Map;
+
+public class WorkflowSizeTest extends BrooklynMgmtUnitTestSupport {
+
+    private static final Logger log = 
LoggerFactory.getLogger(WorkflowSizeTest.class);
+
+    private BasicApplication app;
+
+    protected void createAppWithEffector(List<?> steps) {
+        WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+
+        if (this.app!=null) throw new IllegalStateException("Already have an 
app");
+        this.app = 
mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class));
+        WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance()
+                .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflow")
+                .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, 
MutableMap.of("param", null))
+                .configure(WorkflowEffector.STEPS, (List) steps)
+        );
+        eff.apply((EntityLocal)app);
+    }
+
+    @Test
+    public void testSizeOfAllSensors() {
+        createAppWithEffector(MutableList.of(
+                "let pc = ${param}",
+                "let map myMap = {}",
+                "transform param | prepend hello-",
+                "let myMap.a = ${param}",
+                "let myMap.b = ${output}",
+                "return ${myMap}"
+        ));
+
+        String sampleData = "sample data for testing something big\n";
+
+        app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), 
MutableMap.of("param", sampleData)).getUnchecked();
+
+        Map<String, Integer> sizes = getSensorSizes();
+        sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+
+        Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), 
result -> result < 10*1000);
+
+
+        app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), 
MutableMap.of("param", sampleData)).getUnchecked();
+        sizes = getSensorSizes();
+        sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+        Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), 
result -> result < 20*1000);
+
+
+        // 100k payload now -> bumps sensor size from 5k to 3MB (before any 
optimization)
+        for (int i=0; i<1000; i++) {
+            for (int j=0; j<10; j++) sampleData += "0123456789";
+            sampleData += "\n";
+        }
+        app.invoke(app.getEntityType().getEffectorByName("myWorkflow").get(), 
MutableMap.of("param", sampleData)).getUnchecked();
+        sizes = getSensorSizes();
+        sizes.forEach((k,v) -> { log.info("Sensor "+k+": "+v); });
+        Asserts.assertThat(sizes.values().stream().reduce(0, (v0,v1)->v0+v1), 
result -> result > 100*1000);
+    }
+
+    protected Map<String,Integer> getSensorSizes() {
+        //Dumper.dumpInfo(app);
+        Map<String,Integer> sizes = MutableMap.of();
+        for (int retryWhileCME=0; ; retryWhileCME++) {
+            try {
+                sizes.clear();
+                app.sensors().getAll().forEach((k, v) -> {
+                    try {
+                        sizes.put(k.getName(), 
BeanWithTypeUtils.newMapper(mgmt, false, null, 
false).writeValueAsString(v).length());
+                    } catch (JsonProcessingException e) {
+                        throw Exceptions.propagate(e);
+                    }
+                });
+                break;
+            } catch (Exception e) {
+                boolean allowedToRetry = false;
+                allowedToRetry |= Exceptions.getFirstThrowableOfType(e, 
ConcurrentModificationException.class)!=null;
+                allowedToRetry |= Exceptions.getFirstThrowableOfType(e, 
NullPointerException.class)!=null;
+                if (allowedToRetry && retryWhileCME<10) {
+                    log.info("Serializing sensors failed; will retry: "+e);
+                    Time.sleep(100);
+                    continue;
+                }
+                throw Exceptions.propagate(e);
+            }
+        }
+        return sizes;
+    }
+}

Reply via email to