Repository: camel Updated Branches: refs/heads/master 8fbddbd7f -> 88de1d433
CAMEL-10494 Camel-Kubernetes: Consuming events from nodes Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/88de1d43 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/88de1d43 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/88de1d43 Branch: refs/heads/master Commit: 88de1d433fe92c6a7b9dc410aff4d813c6102727 Parents: 8fbddbd Author: Andrea Cosentino <[email protected]> Authored: Fri Nov 18 11:41:50 2016 +0100 Committer: Andrea Cosentino <[email protected]> Committed: Fri Nov 18 11:41:50 2016 +0100 ---------------------------------------------------------------------- .../kubernetes/KubernetesEndpoint.java | 4 + .../consumer/KubernetesNodesConsumer.java | 118 ++++++++++++++++ .../kubernetes/consumer/common/NodeEvent.java | 47 +++++++ .../consumer/KubernetesNodesConsumerTest.java | 136 +++++++++++++++++++ 4 files changed, 305 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java index e520699..2934b64 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.kubernetes.consumer.KubernetesNamespacesConsumer; +import org.apache.camel.component.kubernetes.consumer.KubernetesNodesConsumer; import org.apache.camel.component.kubernetes.consumer.KubernetesPodsConsumer; import org.apache.camel.component.kubernetes.consumer.KubernetesReplicationControllersConsumer; import org.apache.camel.component.kubernetes.consumer.KubernetesServicesConsumer; @@ -141,6 +142,9 @@ public class KubernetesEndpoint extends DefaultEndpoint { case KubernetesCategory.NAMESPACES: return new KubernetesNamespacesConsumer(this, processor); + + case KubernetesCategory.NODES: + return new KubernetesNodesConsumer(this, processor); default: throw new IllegalArgumentException("The " + category + " consumer category doesn't exist"); http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java new file mode 100644 index 0000000..f2a1d55 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumer.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.consumer; + +import java.util.concurrent.ExecutorService; + +import io.fabric8.kubernetes.api.model.DoneableNode; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.NodeList; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.dsl.ClientNonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.ClientResource; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.component.kubernetes.KubernetesEndpoint; +import org.apache.camel.component.kubernetes.consumer.common.NodeEvent; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubernetesNodesConsumer extends DefaultConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesNodesConsumer.class); + + private final Processor processor; + private ExecutorService executor; + + public KubernetesNodesConsumer(KubernetesEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.processor = processor; + } + + @Override + public KubernetesEndpoint getEndpoint() { + return (KubernetesEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + executor = getEndpoint().createExecutor(); + + executor.submit(new NodesConsumerTask()); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + LOG.debug("Stopping Kubernetes Nodes Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; + } + + class NodesConsumerTask implements Runnable { + + @Override + public void run() { + ClientNonNamespaceOperation<Node, NodeList, DoneableNode, ClientResource<Node, DoneableNode>> w = getEndpoint().getKubernetesClient().nodes(); + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) + && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) { + w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue()); + } + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) { + w.withName(getEndpoint().getKubernetesConfiguration().getResourceName()); + } + w.watch(new Watcher<Node>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Node resource) { + NodeEvent ne = new NodeEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(ne.getNode()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java new file mode 100644 index 0000000..9824c43 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/common/NodeEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.consumer.common; + +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.client.Watcher.Action; + +public class NodeEvent { + private io.fabric8.kubernetes.client.Watcher.Action action; + + private Node node; + + public NodeEvent(Action action, Node node) { + this.action = action; + this.node = node; + } + + public io.fabric8.kubernetes.client.Watcher.Action getAction() { + return action; + } + + public void setAction(io.fabric8.kubernetes.client.Watcher.Action action) { + this.action = action; + } + + public Node getNode() { + return node; + } + + public void setNode(Node node) { + this.node = node; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/88de1d43/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java new file mode 100644 index 0000000..0d416aa --- /dev/null +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNodesConsumerTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.consumer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodSpec; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kubernetes.KubernetesConstants; +import org.apache.camel.component.kubernetes.KubernetesTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.ObjectHelper; +import org.junit.Test; + +public class KubernetesNodesConsumerTest extends KubernetesTestSupport { + + @EndpointInject(uri = "mock:result") + protected MockEndpoint mockResultEndpoint; + + @Test + public void createAndDeletePod() throws Exception { + if (ObjectHelper.isEmpty(authToken)) { + return; + } + + mockResultEndpoint.expectedMessageCount(1); + mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "MODIFIED"); + Exchange ex = template.request("direct:createPod", new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, "default"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_NAME, "test"); + Map<String, String> labels = new HashMap<String, String>(); + labels.put("this", "rocks"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_PODS_LABELS, labels); + PodSpec podSpec = new PodSpec(); + podSpec.setHost("172.28.128.4"); + Container cont = new Container(); + cont.setImage("docker.io/jboss/wildfly:latest"); + cont.setName("pippo"); + + List<ContainerPort> containerPort = new ArrayList<ContainerPort>(); + ContainerPort port = new ContainerPort(); + port.setHostIP("0.0.0.0"); + port.setHostPort(8080); + port.setContainerPort(8080); + + containerPort.add(port); + + cont.setPorts(containerPort); + + List<Container> list = new ArrayList<Container>(); + list.add(cont); + + podSpec.setContainers(list); + + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_SPEC, podSpec); + } + }); + + ex = template.request("direct:deletePod", new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_NAMESPACE_NAME, "default"); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_POD_NAME, "test"); + } + }); + + boolean podDeleted = ex.getOut().getBody(Boolean.class); + + assertTrue(podDeleted); + + Thread.sleep(3000); + + mockResultEndpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:list").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPods", host, + authToken); + from("direct:listByLabels") + .toF("kubernetes://%s?oauthToken=%s&category=pods&operation=listPodsByLabels", host, authToken); + from("direct:getPod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=getPod", host, + authToken); + from("direct:createPod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=createPod", host, + authToken); + from("direct:deletePod").toF("kubernetes://%s?oauthToken=%s&category=pods&operation=deletePod", host, + authToken); + fromF("kubernetes://%s?oauthToken=%s&category=nodes&resourceName=minikube", host, authToken) + .process(new KubernertesProcessor()).to(mockResultEndpoint); + } + }; + } + + public class KubernertesProcessor implements Processor { + @Override + public void process(Exchange exchange) throws Exception { + Message in = exchange.getIn(); + Node node = exchange.getIn().getBody(Node.class); + log.info("Got event with node name: " + node.getMetadata().getName() + " and action " + + in.getHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION)); + } + } +}
