This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 58cbca076c42eb06f20a4fb96374a9b21b91da85 Author: Duncan Grant <[email protected]> AuthorDate: Tue Jul 7 21:55:38 2020 +0100 Sensors per deployment --- .../container/entity/helm/HelmEntityImpl.java | 201 ++++++++++----------- .../container/entity/helm/HelmEntityLiveTest.java | 24 ++- 2 files changed, 108 insertions(+), 117 deletions(-) diff --git a/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java b/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java index 3d948fe..01e6a9e 100644 --- a/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java +++ b/locations/container/src/main/java/org/apache/brooklyn/container/entity/helm/HelmEntityImpl.java @@ -21,16 +21,14 @@ package org.apache.brooklyn.container.entity.helm; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentList; -import io.fabric8.kubernetes.api.model.apps.DoneableDeployment; import io.fabric8.kubernetes.client.*; -import io.fabric8.kubernetes.client.dsl.*; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.container.location.kubernetes.KubernetesLocation; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.sensor.function.FunctionSensor; import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.function.FunctionPollConfig; @@ -74,21 +72,49 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { addKubernetesFeeds(); } - private void connectServiceUpIsRunning() { - Duration period = Duration.FIVE_SECONDS; - serviceUpFeed = FunctionFeed.builder() - .entity(this) - .period(period) - .poll(new FunctionPollConfig<Boolean, Boolean>(Attributes.SERVICE_UP) - .suppressDuplicates(true) - .onException(Functions.constant(Boolean.FALSE)) - .callable(new Callable<Boolean>() { - @Override - public Boolean call() { - return isRunning(); - } - })) - .build(); + @Override + public Integer resize(String deploymentName, Integer desiredSize) { + scaleDeployment(desiredSize, deploymentName); + return desiredSize; + } + + @Override + public void start(Collection<? extends Location> locations) { + addLocations(locations); + doInstall(); + connectSensors(); + } + + @Override + public void stop() { + disconnectSensors(); + deleteHelmDeployment(); + } + + @Override + public void restart() { + stop(); + start(ImmutableList.<Location>of()); + } + + public boolean isRunning() { + String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); + String namespace = getNamespace(); + ImmutableList<String> command = ImmutableList.<String>of(String.format("helm status %s --namespace %s", helmNameInstallName, namespace)); + OutputStream out = new ByteArrayOutputStream(); + OutputStream err = new ByteArrayOutputStream(); + int exectionResponse = ProcessTool.execProcesses(command, null, null, out, err, ";", false, this); + return 0 == exectionResponse; + } + + public void scaleDeployment(Integer scale, String deploymentName) { + String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG); + KubernetesClient client = getClient(config); + client.apps().deployments().inNamespace(getNamespace()).withName(deploymentName).scale(scale); + } + + protected void disconnectSensors() { + disconnectServiceUpIsRunning(); } private void addKubernetesFeeds() { @@ -106,32 +132,29 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { .period(Duration.TEN_SECONDS) .build()); -// addFeed(FunctionFeed.builder() -// .entity(this) -// .poll(new FunctionPollConfig<String, Boolean>(DEPLOYMENT_READY) -// .callable(status)) -// .period(Duration.FIVE_SECONDS) -// .build()); - - Callable replicas = getKubeReplicasCallable(); - addFeed(FunctionFeed.builder() - .entity(this) - .poll(new FunctionPollConfig<String, Integer>(REPLICAS) - .callable(replicas)) - .period(Duration.FIVE_SECONDS) - .build()); - - Callable availableReplicas = getKubeReplicasAvailableCallable(); - addFeed(FunctionFeed.builder() - .entity(this) - .poll(new FunctionPollConfig<String, Integer>(AVAILABLE_REPLICAS) - .callable(availableReplicas)) - .period(Duration.FIVE_SECONDS) - .build()); + String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG); + KubernetesClient client = getClient(config); + String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); + List<Deployment> deployments = getDeployments(client, helmNameInstallName); + + for (Deployment deployment : deployments) { + String sensorName = "helm.deployment." + deployment.getMetadata().getName() + ".replicas"; + addFeed(FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<String, Integer>(Sensors.newIntegerSensor(sensorName)) + .callable(getKubeReplicasCallable(deployment.getMetadata().getName()))) + .period(Duration.TEN_SECONDS) + .build()); + + addFeed(FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<String, Integer>(Sensors.newIntegerSensor(sensorName)) + .callable(getKubeReplicasAvailableCallable(deployment.getMetadata().getName()))) + .period(Duration.TEN_SECONDS) + .build()); + } } - - private void addHelmFeed(String command, AttributeSensor<String> sensor) { Callable status = getCallable(command); FunctionPollConfig pollConfig = new FunctionPollConfig<String, String>(sensor) @@ -145,32 +168,28 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { .build()); } - - protected void disconnectSensors() { - disconnectServiceUpIsRunning(); + private void connectServiceUpIsRunning() { + Duration period = Duration.FIVE_SECONDS; + serviceUpFeed = FunctionFeed.builder() + .entity(this) + .period(period) + .poll(new FunctionPollConfig<Boolean, Boolean>(Attributes.SERVICE_UP) + .suppressDuplicates(true) + .onException(Functions.constant(Boolean.FALSE)) + .callable(new Callable<Boolean>() { + @Override + public Boolean call() { + return isRunning(); + } + })) + .build(); } + private void disconnectServiceUpIsRunning() { serviceUpFeed.stop(); } - @Override - public Integer resize(String deploymentName, Integer desiredSize) { - scaleDeployment(desiredSize, deploymentName); - return desiredSize; - } - - public Integer getCurrentSize() { - return sensors().get(REPLICAS); - } - - @Override - public void start(Collection<? extends Location> locations) { - addLocations(locations); - doInstall(); - connectSensors(); - } - private void doInstall() { String repo_name = getConfig(HelmEntity.REPO_NAME); String repo_url = getConfig(HelmEntity.REPO_URL); @@ -186,11 +205,15 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { DynamicTasks.queue("install repo", new Runnable() { @Override public void run() { - ImmutableList<String> installHelmTemplateCommand = + ImmutableList<String> setupRepoCommand = ImmutableList.<String>of(buildAddRepoCommand(repo_name, repo_url)); - OutputStream out = new ByteArrayOutputStream(); - OutputStream err = new ByteArrayOutputStream(); - ProcessTool.execProcesses(installHelmTemplateCommand, null, null, out, err, ";", false, this); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + + ProcessTool.execProcesses(setupRepoCommand, null, null, out, err, ";", false, this); + + Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, out)); + Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, err)); }}); } @@ -215,18 +238,13 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { } - @Override - public void stop() { - disconnectSensors(); - deleteHelmDeployment(); - } - private void deleteHelmDeployment() { DynamicTasks.queue("stop", new Runnable() { @Override public void run() { String helm_name_install_name = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); - ImmutableList<String> command = ImmutableList.<String>of(String.format("helm delete %s", helm_name_install_name)); + String namespace = getNamespace(); + ImmutableList<String> command = ImmutableList.<String>of(String.format("helm delete %s --namespace %s", helm_name_install_name, namespace)); OutputStream out = new ByteArrayOutputStream(); OutputStream err = new ByteArrayOutputStream(); ProcessTool.execProcesses(command, null, null, out, err, ";", false, this); @@ -234,22 +252,6 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { }); } - @Override - public void restart() { - stop(); - start(ImmutableList.<Location>of()); - } - - public boolean isRunning() { - String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); - String namespace = getNamespace(); - ImmutableList<String> command = ImmutableList.<String>of(String.format("helm status %s --namespace %s", helmNameInstallName, namespace)); - OutputStream out = new ByteArrayOutputStream(); - OutputStream err = new ByteArrayOutputStream(); - int exectionResponse = ProcessTool.execProcesses(command, null, null, out, err, ";", false, this); - return 0 == exectionResponse; - } - private Callable<List<String>> getKubeDeploymentsCallable() { String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); @@ -266,7 +268,7 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { }; } - public Callable<String> getCallable(String command) { + private Callable<String> getCallable(String command) { String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); String namespace = getNamespace(); ImmutableList<String> installHelmTemplateCommand = @@ -283,7 +285,7 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { }; } - public Callable getKubeDeploymentsReady() { + private Callable getKubeDeploymentsReady() { String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG); @@ -317,39 +319,30 @@ public class HelmEntityImpl extends AbstractEntity implements HelmEntity { return (KubernetesLocation) getLocations().stream().filter(KubernetesLocation.class::isInstance).findFirst().get(); } - //TODO get rid of this - public Callable getKubeReplicasCallable() { - String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); + private Callable getKubeReplicasCallable(String deploymentName) { String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG); return new Callable() { @Override public Integer call() throws Exception { KubernetesClient client = getClient(config); - return countReplicas(getDeployments(client, helmNameInstallName)); + return countReplicas(getDeployments(client, deploymentName)); } ; }; } - //TODO get rid of this - public Callable getKubeReplicasAvailableCallable() { - String helmNameInstallName = getConfig(HelmEntity.HELM_DEPLOYMENT_NAME); + private Callable getKubeReplicasAvailableCallable(String deploymentName) { String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG); return new Callable() { @Override public Integer call() throws Exception { KubernetesClient client = getClient(config); - return countAvailableReplicas(getDeployments(client, helmNameInstallName)); + return countAvailableReplicas(getDeployments(client, deploymentName)); } ; }; } - public void scaleDeployment(Integer scale, String deploymentName) { - String config = getLocation().getConfig(KubernetesLocation.KUBECONFIG); - KubernetesClient client = getClient(config); - client.apps().deployments().inNamespace(getNamespace()).withName(deploymentName).scale(scale); - } KubernetesClient getClient(String configFile) { Path configPath = Paths.get(configFile); diff --git a/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java b/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java index d753175..4b02d04 100644 --- a/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java +++ b/locations/container/src/test/java/org/apache/brooklyn/container/entity/helm/HelmEntityLiveTest.java @@ -25,6 +25,7 @@ import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.container.location.kubernetes.KubernetesLocation; import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport; import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.collections.MutableMap; @@ -41,6 +42,8 @@ import static org.apache.brooklyn.core.entity.EntityAsserts.assertPredicateEvent public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport { + public static final String PROMETHEUS_TEMPLATE_LOCATION = "~/workspace/charts/stable/prometheus"; + @AfterMethod(alwaysRun = true, timeOut = Asserts.THIRTY_SECONDS_TIMEOUT_MS) @Override public void tearDown() throws Exception { @@ -60,7 +63,7 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport { @Test(groups = {"Live"}) public void testMultiDeployment() { - HelmEntity andManageChild = newHelmSpec("prometheus", "/Users/duncangrant/workspace/charts/stable/prometheus"); + HelmEntity andManageChild = newHelmSpec("prometheus", PROMETHEUS_TEMPLATE_LOCATION); app.start(newKubernetesLocation()); @@ -98,26 +101,28 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport { assertAttributeEqualsEventually(andManageChild, HelmEntity.DEPLOYMENT_READY, true); } + //TODO Why is this broken? @Test(groups = {"Live"}) public void testCanScaleCluster() { HelmEntity andManageChild = newHelmSpec("nginx-test", "bitnami/nginx"); app.start(newKubernetesLocation()); - assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 1); - assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 1); + assertAttributeEqualsEventually(andManageChild, Sensors.newIntegerSensor("helm.deployment.nginx-test.replicas"), 1); + assertAttributeEqualsEventually(andManageChild, Sensors.newIntegerSensor("helm.deployment.nginx-test.replicas.available"), 1); andManageChild.resize("nginx-test",3); - assertAttributeEqualsEventually(andManageChild, HelmEntity.AVAILABLE_REPLICAS, 3); - assertAttributeEqualsEventually(andManageChild, HelmEntity.REPLICAS, 3); + assertAttributeEqualsEventually(andManageChild, Sensors.newIntegerSensor("helm.deployment.nginx-test.replicas"), 3); + assertAttributeEqualsEventually(andManageChild, Sensors.newIntegerSensor("helm.deployment.nginx-test.replicas.available"), 3); assertAttributeEqualsEventually(andManageChild, HelmEntity.DEPLOYMENT_READY, true); } + //TODO Why is this broken? @Test(groups = {"Live"}) public void testCanScaleClusterPrometheus() { - HelmEntity andManageChild = newHelmSpec("prometheus", "/Users/duncangrant/workspace/charts/stable/prometheus"); + HelmEntity andManageChild = newHelmSpec("prometheus", PROMETHEUS_TEMPLATE_LOCATION); app.start(newKubernetesLocation()); @@ -140,15 +145,8 @@ public class HelmEntityLiveTest extends BrooklynAppLiveTestSupport { .configure(HelmEntity.HELM_TEMPLATE, helmTemplate)); } - private ImmutableList<Location> newLocalhostLocation() { - return ImmutableList.<Location>of( - app.newLocalhostProvisioningLocation( - ImmutableMap.of(KubernetesLocation.KUBECONFIG, "/Users/duncangrant/.kube/config"))); - } - private Collection<? extends Location> newKubernetesLocation() { Map<String, ?> allFlags = MutableMap.<String, Object>builder() - .put(KubernetesLocation.KUBECONFIG.getName(), "/Users/duncangrant/.kube/config") .put("image", "cloudsoft/centos:7") .build(); KubernetesLocation kubernetesLocation = (KubernetesLocation) mgmt.getLocationRegistry().getLocationManaged("kubernetes", allFlags);
