CAMEL-11331: Implemented KubernetesClusterService
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/45481262 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/45481262 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/45481262 Branch: refs/heads/master Commit: 45481262c44d4e7caa4749725e01687a53916668 Parents: 9fc6d0b Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Jun 30 17:42:33 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue Aug 8 16:39:43 2017 +0200 ---------------------------------------------------------------------- components/camel-kubernetes/pom.xml | 6 +- .../kubernetes/AbstractKubernetesEndpoint.java | 53 +--- .../kubernetes/KubernetesConfiguration.java | 15 +- .../component/kubernetes/KubernetesHelper.java | 98 +++++++ .../kubernetes/ha/KubernetesClusterService.java | 151 +++++++++++ .../kubernetes/ha/KubernetesClusterView.java | 168 ++++++++++++ .../ha/lock/KubernetesClusterEvent.java | 46 ++++ .../ha/lock/KubernetesClusterEventHandler.java | 27 ++ .../ha/lock/KubernetesLeaderMonitor.java | 256 +++++++++++++++++++ .../ha/lock/KubernetesLeadershipController.java | 211 +++++++++++++++ .../ha/lock/KubernetesLockConfiguration.java | 153 +++++++++++ .../ha/lock/KubernetesMembersMonitor.java | 239 +++++++++++++++++ 12 files changed, 1368 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml index e5409c8..c444068 100644 --- a/components/camel-kubernetes/pom.xml +++ b/components/camel-kubernetes/pom.xml @@ -44,12 +44,14 @@ <dependency> <groupId>io.fabric8</groupId> <artifactId>kubernetes-client</artifactId> - <version>${kubernetes-client-version}</version> + <version>2.3-SNAPSHOT</version> + <!--<version>${kubernetes-client-version}</version>--> </dependency> <dependency> <groupId>io.fabric8</groupId> <artifactId>openshift-client</artifactId> - <version>${kubernetes-client-version}</version> + <version>2.3-SNAPSHOT</version> + <!--<version>${kubernetes-client-version}</version>--> </dependency> <!-- testing --> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java index f48bf6d..b7aeb37 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java @@ -18,14 +18,10 @@ package org.apache.camel.component.kubernetes; import java.util.concurrent.ExecutorService; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.UriParam; -import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +50,7 @@ public abstract class AbstractKubernetesEndpoint extends DefaultEndpoint { @Override protected void doStart() throws Exception { super.doStart(); - client = configuration.getKubernetesClient() != null ? configuration.getKubernetesClient() : createKubernetesClient(); + client = KubernetesHelper.getKubernetesClient(configuration); } @Override @@ -80,52 +76,5 @@ public abstract class AbstractKubernetesEndpoint extends DefaultEndpoint { return configuration; } - private KubernetesClient createKubernetesClient() { - LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString()); - ConfigBuilder builder = new ConfigBuilder(); - builder.withMasterUrl(configuration.getMasterUrl()); - if ((ObjectHelper.isNotEmpty(configuration.getUsername()) - && ObjectHelper.isNotEmpty(configuration.getPassword())) - && ObjectHelper.isEmpty(configuration.getOauthToken())) { - builder.withUsername(configuration.getUsername()); - builder.withPassword(configuration.getPassword()); - } - if (ObjectHelper.isNotEmpty(configuration.getOauthToken())) { - builder.withOauthToken(configuration.getOauthToken()); - } - if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) { - builder.withCaCertData(configuration.getCaCertData()); - } - if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) { - builder.withCaCertFile(configuration.getCaCertFile()); - } - if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) { - builder.withClientCertData(configuration.getClientCertData()); - } - if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) { - builder.withClientCertFile(configuration.getClientCertFile()); - } - if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) { - builder.withApiVersion(configuration.getApiVersion()); - } - if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) { - builder.withClientKeyAlgo(configuration.getClientKeyAlgo()); - } - if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) { - builder.withClientKeyData(configuration.getClientKeyData()); - } - if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) { - builder.withClientKeyFile(configuration.getClientKeyFile()); - } - if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) { - builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase()); - } - if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) { - builder.withTrustCerts(configuration.getTrustCerts()); - } - - Config conf = builder.build(); - return new DefaultKubernetesClient(conf); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java index 89d0d9a..271ef71 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java @@ -19,13 +19,14 @@ package org.apache.camel.component.kubernetes; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriPath; @UriParams -public class KubernetesConfiguration { +public class KubernetesConfiguration implements Cloneable { @UriPath @Metadata(required = "true") @@ -395,6 +396,18 @@ public class KubernetesConfiguration { this.resourceName = resourceName; } + // **************************************** + // Copy + // **************************************** + + public KubernetesConfiguration copy() { + try { + return (KubernetesConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } + @Override public String toString() { return "KubernetesConfiguration [masterUrl=" + masterUrl + ", category=" + category + ", kubernetesClient=" http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java new file mode 100644 index 0000000..62235ad --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper moethods for Kubernetes resources. + */ +public final class KubernetesHelper { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesHelper.class); + + private KubernetesHelper() { + } + + public static KubernetesClient getKubernetesClient(KubernetesConfiguration configuration) { + if (configuration.getKubernetesClient() != null) { + return configuration.getKubernetesClient(); + } else if (configuration.getMasterUrl() != null) { + return createKubernetesClient(configuration); + } else { + LOG.info("Creating default kubernetes client without applying configuration"); + return new DefaultKubernetesClient(); + } + } + + private static KubernetesClient createKubernetesClient(KubernetesConfiguration configuration) { + LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString()); + + ConfigBuilder builder = new ConfigBuilder(); + builder.withMasterUrl(configuration.getMasterUrl()); + if ((ObjectHelper.isNotEmpty(configuration.getUsername()) + && ObjectHelper.isNotEmpty(configuration.getPassword())) + && ObjectHelper.isEmpty(configuration.getOauthToken())) { + builder.withUsername(configuration.getUsername()); + builder.withPassword(configuration.getPassword()); + } + if (ObjectHelper.isNotEmpty(configuration.getOauthToken())) { + builder.withOauthToken(configuration.getOauthToken()); + } + if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) { + builder.withCaCertData(configuration.getCaCertData()); + } + if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) { + builder.withCaCertFile(configuration.getCaCertFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) { + builder.withClientCertData(configuration.getClientCertData()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) { + builder.withClientCertFile(configuration.getClientCertFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) { + builder.withApiVersion(configuration.getApiVersion()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) { + builder.withClientKeyAlgo(configuration.getClientKeyAlgo()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) { + builder.withClientKeyData(configuration.getClientKeyData()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) { + builder.withClientKeyFile(configuration.getClientKeyFile()); + } + if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) { + builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase()); + } + if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) { + builder.withTrustCerts(configuration.getTrustCerts()); + } + + Config conf = builder.build(); + return new DefaultKubernetesClient(conf); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java new file mode 100644 index 0000000..6d87d48 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha; + +import java.net.InetAddress; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration; +import org.apache.camel.impl.ha.AbstractCamelClusterService; +import org.apache.camel.util.ObjectHelper; + +/** + * A Kubernetes based cluster service leveraging Kubernetes optimistic locks on resources (specifically ConfigMaps). + */ +public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> { + + public static final String DEFAULT_CONFIGMAP_NAME = "leaders"; + + private KubernetesConfiguration configuration; + + private KubernetesLockConfiguration lockConfiguration; + + public KubernetesClusterService() { + this.configuration = new KubernetesConfiguration(); + this.lockConfiguration = new KubernetesLockConfiguration(); + } + + public KubernetesClusterService(KubernetesConfiguration configuration) { + this.configuration = configuration.copy(); + this.lockConfiguration = new KubernetesLockConfiguration(); + } + + public KubernetesClusterService(CamelContext camelContext, KubernetesConfiguration configuration) { + super(null, camelContext); + this.configuration = configuration.copy(); + this.lockConfiguration = new KubernetesLockConfiguration(); + } + + @Override + protected KubernetesClusterView createView(String namespace) throws Exception { + KubernetesLockConfiguration lockConfig = configWithGroupNameAndDefaults(namespace); + return new KubernetesClusterView(this, configuration, lockConfig); + } + + protected KubernetesLockConfiguration configWithGroupNameAndDefaults(String groupName) { + KubernetesLockConfiguration config = this.lockConfiguration.copy(); + + config.setGroupName(ObjectHelper.notNull(groupName, "groupName")); + + // Check defaults (Namespace and podName can be null) + if (config.getConfigMapName() == null) { + config.setConfigMapName(DEFAULT_CONFIGMAP_NAME); + } + if (config.getPodName() == null) { + config.setPodName(System.getenv("HOSTNAME")); + if (config.getPodName() == null) { + try { + config.setPodName(InetAddress.getLocalHost().getHostName()); + } catch (Exception e) { + throw new RuntimeCamelException("Unable to determine pod name", e); + } + } + } + + return config; + } + + public String getMasterUrl() { + return configuration.getMasterUrl(); + } + + /** + * Set the URL of the Kubernetes master (read from Kubernetes client properties by default). + */ + public void setMasterUrl(String masterUrl) { + configuration.setMasterUrl(masterUrl); + } + + public String getKubernetesNamespace() { + return this.lockConfiguration.getKubernetesResourcesNamespace(); + } + + /** + * Set the name of the Kubernetes namespace containing the pods and the configmap (autodetected by default) + */ + public void setKubernetesNamespace(String kubernetesNamespace) { + this.lockConfiguration.setKubernetesResourcesNamespace(kubernetesNamespace); + } + + public String getConfigMapName() { + return this.lockConfiguration.getConfigMapName(); + } + + /** + * Set the name of the ConfigMap used to do optimistic locking (defaults to 'leaders'). + */ + public void setConfigMapName(String configMapName) { + this.lockConfiguration.setConfigMapName(configMapName); + } + + public String getPodName() { + return this.lockConfiguration.getPodName(); + } + + /** + * Set the name of the current pod (autodetected from container host name by default). + */ + public void setPodName(String podName) { + this.lockConfiguration.setPodName(podName); + } + + public Map<String, String> getClusterLabels() { + return lockConfiguration.getClusterLabels(); + } + + /** + * Set the labels used to identify the pods composing the cluster. + */ + public void setClusterLabels(Map<String, String> clusterLabels) { + lockConfiguration.setClusterLabels(clusterLabels); + } + + public Long getWatchRefreshIntervalSeconds() { + return lockConfiguration.getWatchRefreshIntervalSeconds(); + } + + /** + * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated. + * Watch recreation can be disabled by putting a negative value (the default will be used in case of null). + */ + public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) { + lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java new file mode 100644 index 0000000..9ac6a86 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.apache.camel.component.kubernetes.KubernetesConfiguration; +import org.apache.camel.component.kubernetes.KubernetesHelper; +import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent; +import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController; +import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration; +import org.apache.camel.ha.CamelClusterMember; +import org.apache.camel.impl.ha.AbstractCamelClusterView; +import org.apache.camel.util.ObjectHelper; + +/** + * The cluster view on a specific Camel cluster namespace (not to be confused with Kubernetes namespaces). + * Namespaces are represented as keys in a Kubernetes ConfigMap (values are the current leader pods). + */ +public class KubernetesClusterView extends AbstractCamelClusterView { + + private KubernetesClient kubernetesClient; + + private KubernetesConfiguration configuration; + + private KubernetesLockConfiguration lockConfiguration; + + private KubernetesClusterMember localMember; + + private Map<String, KubernetesClusterMember> memberCache; + + private volatile Optional<CamelClusterMember> currentLeader = Optional.empty(); + + private volatile List<CamelClusterMember> currentMembers = Collections.emptyList(); + + private KubernetesLeadershipController controller; + + public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { + super(cluster, lockConfiguration.getGroupName()); + this.configuration = configuration; + this.lockConfiguration = lockConfiguration; + this.localMember = new KubernetesClusterMember(lockConfiguration.getPodName()); + this.memberCache = new HashMap<>(); + } + + @Override + public Optional<CamelClusterMember> getMaster() { + return currentLeader; + } + + @Override + public CamelClusterMember getLocalMember() { + return localMember; + } + + @Override + public List<CamelClusterMember> getMembers() { + return currentMembers; + } + + @Override + protected void doStart() throws Exception { + if (controller == null) { + this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration); + + controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> { + if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) { + // New leader + Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData(); + currentLeader = leader.map(this::toMember); + if (currentLeader.isPresent()) { + fireLeadershipChangedEvent(currentLeader.get()); + } + } else if (event instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) { + Set<String> members = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(event).getData(); + Set<String> oldMembers = currentMembers.stream().map(CamelClusterMember::getId).collect(Collectors.toSet()); + currentMembers = members.stream().map(this::toMember).collect(Collectors.toList()); + + // Computing differences + Set<String> added = new HashSet<>(members); + added.removeAll(oldMembers); + + Set<String> removed = new HashSet<>(oldMembers); + removed.removeAll(members); + + for (String id : added) { + fireMemberAddedEvent(toMember(id)); + } + + for (String id : removed) { + fireMemberRemovedEvent(toMember(id)); + } + } + }); + + controller.start(); + } + } + + @Override + protected void doStop() throws Exception { + if (controller != null) { + controller.stop(); + controller = null; + kubernetesClient.close(); + kubernetesClient = null; + } + } + + protected KubernetesClusterMember toMember(String name) { + if (name.equals(localMember.getId())) { + return localMember; + } + return memberCache.computeIfAbsent(name, KubernetesClusterMember::new); + } + + class KubernetesClusterMember implements CamelClusterMember { + + private String podName; + + public KubernetesClusterMember(String podName) { + this.podName = ObjectHelper.notNull(podName, "podName"); + } + + @Override + public boolean isMaster() { + return currentLeader.isPresent() && currentLeader.get().getId().equals(podName); + } + + @Override + public String getId() { + return podName; + } + + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("KubernetesClusterMember{"); + sb.append("podName='").append(podName).append('\''); + sb.append('}'); + return sb.toString(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java new file mode 100644 index 0000000..59f8768 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.Optional; +import java.util.Set; + +/** + * Super interface for events produced by the Kubernetes cluster. + */ +@FunctionalInterface +public interface KubernetesClusterEvent { + + Object getData(); + + /** + * Event signalling that the list of members of the Kubernetes cluster has changed. + */ + interface KubernetesClusterMemberListChangedEvent extends KubernetesClusterEvent { + @Override + Set<String> getData(); + } + + /** + * Event signalling the presence of a new leader. + */ + interface KubernetesClusterLeaderChangedEvent extends KubernetesClusterEvent { + @Override + Optional<String> getData(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java new file mode 100644 index 0000000..0962847 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +/** + * Interface for handling Kubernetes cluster events. + */ +@FunctionalInterface +public interface KubernetesClusterEventHandler { + + void onKubernetesClusterEvent(KubernetesClusterEvent event); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java new file mode 100644 index 0000000..5555fe1 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; + +import org.apache.camel.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors continuously the configmap to detect changes in leadership. + * It calls the callback eventHandlers only when the leader changes w.r.t. the previous invocation. + */ +class KubernetesLeaderMonitor implements Service { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderMonitor.class); + + private ScheduledExecutorService serializedExecutor; + + private KubernetesClient kubernetesClient; + + private KubernetesLockConfiguration lockConfiguration; + + private List<KubernetesClusterEventHandler> eventHandlers; + + private Watch watch; + + private boolean terminated; + + private boolean refreshing; + + private ConfigMap latestConfigMap; + + public KubernetesLeaderMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) { + this.serializedExecutor = serializedExecutor; + this.kubernetesClient = kubernetesClient; + this.lockConfiguration = lockConfiguration; + this.eventHandlers = new LinkedList<>(); + } + + public void addClusterEventHandler(KubernetesClusterEventHandler leaderEventHandler) { + this.eventHandlers.add(leaderEventHandler); + } + + @Override + public void start() throws Exception { + this.terminated = false; + serializedExecutor.execute(this::startWatch); + serializedExecutor.execute(() -> doPoll(true)); + + long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault(); + if (recreationDelay > 0) { + serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS); + } + } + + @Override + public void stop() throws Exception { + this.terminated = true; + Watch watch = this.watch; + if (watch != null) { + watch.close(); + } + } + + public void refresh() { + serializedExecutor.execute(() -> { + if (!terminated) { + refreshing = true; + try { + doPoll(false); + + Watch w = this.watch; + if (w != null) { + // It will be recreated + w.close(); + } + } finally { + refreshing = false; + } + } + }); + } + + private void startWatch() { + try { + LOG.debug("Starting ConfigMap watcher for monitoring the current leader"); + this.watch = kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .watch(new Watcher<ConfigMap>() { + + @Override + public void eventReceived(Action action, ConfigMap configMap) { + switch (action) { + case MODIFIED: + case DELETED: + case ADDED: + LOG.debug("Received update from watch on ConfigMap {}", configMap); + serializedExecutor.execute(() -> checkAndNotify(configMap)); + break; + default: + } + } + + @Override + public void onClose(KubernetesClientException e) { + if (!terminated) { + KubernetesLeaderMonitor.this.watch = null; + if (refreshing) { + LOG.info("Refreshing ConfigMap watcher..."); + serializedExecutor.execute(KubernetesLeaderMonitor.this::startWatch); + } else { + LOG.warn("ConfigMap watcher has been closed unexpectedly. Recreating it in 1 second...", e); + serializedExecutor.schedule(KubernetesLeaderMonitor.this::startWatch, 1, TimeUnit.SECONDS); + } + } + } + }); + } catch (Exception ex) { + LOG.warn("Unable to watch for configmap changes. Retrying in 5 seconds..."); + LOG.debug("Error while trying to watch the configmap", ex); + + this.serializedExecutor.schedule(this::startWatch, 5, TimeUnit.SECONDS); + } + } + + private void doPoll(boolean retry) { + LOG.debug("Starting poll to get configmap {}", this.lockConfiguration.getConfigMapName()); + ConfigMap configMap; + try { + configMap = pollConfigMap(); + } catch (Exception ex) { + if (retry) { + LOG.warn("ConfigMap poll failed. Retrying in 5 seconds...", ex); + this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS); + } else { + LOG.warn("ConfigMap poll failed", ex); + } + return; + } + + checkAndNotify(configMap); + } + + private void checkAndNotify(ConfigMap candidateConfigMap) { + LOG.debug("Checking configMap {}", candidateConfigMap); + ConfigMap newConfigMap = newest(this.latestConfigMap, candidateConfigMap); + Optional<String> leader = extractLeader(newConfigMap); + Optional<String> oldLeader = extractLeader(this.latestConfigMap); + + this.latestConfigMap = newConfigMap; + + LOG.debug("The new leader is {}. Old leader was {}", leader, oldLeader); + if (!leader.equals(oldLeader)) { + LOG.debug("Notifying the new leader to all eventHandlers"); + for (KubernetesClusterEventHandler eventHandler : eventHandlers) { + eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> leader); + } + } else { + LOG.debug("Leader has not changed"); + } + } + + private ConfigMap pollConfigMap() { + return kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .get(); + } + + private Optional<String> extractLeader(ConfigMap configMap) { + Optional<String> leader = Optional.empty(); + if (configMap != null && configMap.getData() != null) { + leader = Optional.ofNullable(configMap.getData().get(this.lockConfiguration.getGroupName())); + } + return leader; + } + + private ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2) { + ConfigMap newest = null; + + if (configMap1 != null && configMap2 == null) { + newest = configMap1; + } else if (configMap1 == null && configMap2 != null) { + newest = configMap2; + } + + if (newest == null) { + String rv1 = extractResourceVersion(configMap1); + String rv2 = extractResourceVersion(configMap2); + newest = newest(configMap1, configMap2, rv1, rv2); + } + + if (newest == null) { + String ct1 = extractCreationTimestamp(configMap1); + String ct2 = extractCreationTimestamp(configMap2); + // timestamps are string-comparable + newest = newest(configMap1, configMap2, ct1, ct2); + } + + return newest; + } + + private <T extends Comparable<T>> ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2, T cmp1, T cmp2) { + if (cmp1 != null && cmp2 != null) { + int comp = cmp1.compareTo(cmp2); + if (comp > 0) { + return configMap1; + } else { + return configMap2; + } + } + return null; + } + + private String extractResourceVersion(ConfigMap configMap) { + if (configMap != null && configMap.getMetadata() != null) { + return configMap.getMetadata().getResourceVersion(); + } + return null; + } + + private String extractCreationTimestamp(ConfigMap configMap) { + if (configMap != null && configMap.getMetadata() != null) { + return configMap.getMetadata().getCreationTimestamp(); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java new file mode 100644 index 0000000..ad2f9bc --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; + +import org.apache.camel.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Start the monitors and participate to leader election when no active leaders are present. + * It communicates changes in leadership and cluster members to the given event handler. + */ +public class KubernetesLeadershipController implements Service { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class); + + private KubernetesClient kubernetesClient; + + private KubernetesLockConfiguration lockConfiguration; + + private ScheduledExecutorService executor; + + private KubernetesLeaderMonitor leaderMonitor; + + private KubernetesMembersMonitor membersMonitor; + + private Optional<String> currentLeader; + + private Set<String> currentMembers; + + private KubernetesClusterEventHandler eventHandler; + + public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { + + this.kubernetesClient = kubernetesClient; + this.lockConfiguration = lockConfiguration; + this.eventHandler = eventHandler; + + this.currentLeader = Optional.empty(); + this.currentMembers = Collections.emptySet(); + } + + @Override + public void start() throws Exception { + + if (executor == null) { + executor = Executors.newSingleThreadScheduledExecutor(); // No concurrency + leaderMonitor = new KubernetesLeaderMonitor(this.executor, this.kubernetesClient, this.lockConfiguration); + membersMonitor = new KubernetesMembersMonitor(this.executor, this.kubernetesClient, this.lockConfiguration); + + leaderMonitor.addClusterEventHandler(e -> executor.execute(() -> onLeaderChanged(e))); + if (eventHandler != null) { + leaderMonitor.addClusterEventHandler(eventHandler); + } + + membersMonitor.addClusterEventHandler(e -> executor.execute(() -> onMembersChanged(e))); + if (eventHandler != null) { + membersMonitor.addClusterEventHandler(eventHandler); + } + + // Start all services + leaderMonitor.start(); + membersMonitor.start(); + + // Fire a new election if possible + executor.execute(this::runLeaderElection); + } + + } + + @Override + public void stop() throws Exception { + if (executor != null) { + membersMonitor.stop(); + leaderMonitor.stop(); + executor.shutdown(); + executor.shutdownNow(); + + membersMonitor = null; + leaderMonitor = null; + executor = null; + } + } + + private void onLeaderChanged(KubernetesClusterEvent e) { + Optional<String> newLeader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(e).getData(); + if (!newLeader.isPresent()) { + executor.execute(this::tryLeaderElection); + } + this.currentLeader = newLeader; + } + + private void onMembersChanged(KubernetesClusterEvent e) { + Set<String> newMembers = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(e).getData(); + if (currentLeader.isPresent()) { + // Check if the current leader is still present in the list + if (!newMembers.contains(currentLeader.get()) && currentMembers.contains(currentLeader.get())) { + executor.execute(this::runLeaderElection); + } + } + this.currentMembers = newMembers; + } + + private void runLeaderElection() { + boolean finished = false; + try { + finished = tryLeaderElection(); + } catch (Exception ex) { + LOG.warn("Exception while trying to acquire the leadership", ex); + } + + if (!finished) { + executor.schedule(this::runLeaderElection, 1, TimeUnit.SECONDS); + } + } + + private boolean tryLeaderElection() { + LOG.debug("Starting leader election"); + if (!currentMembers.contains(this.lockConfiguration.getPodName())) { + LOG.debug("The current pod ({}) is not in the list of participating pods {}. Cannot participate to the election", this.lockConfiguration.getPodName(), currentMembers); + return false; + } + + ConfigMap configMap = kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .get(); + + if (configMap == null) { + // No configmap created so far + LOG.info("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created"); + + ConfigMap newConfigMap = new ConfigMapBuilder(). + withNewMetadata() + .withName(this.lockConfiguration.getConfigMapName()) + .addToLabels("provider", "camel") + .addToLabels("kind", "locks"). + endMetadata() + .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName()) + .build(); + + try { + kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .create(newConfigMap); + } catch (Exception ex) { + // Suppress exception + LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right " + + "permissions to create it"); + LOG.debug("Exception while trying to create the ConfigMap", ex); + return false; + } + return true; + } else { + LOG.info("Lock configmap already present in the Kubernetes namespace. Checking..."); + Map<String, String> leaders = configMap.getData(); + Optional<String> oldLeader = leaders != null ? Optional.ofNullable(leaders.get(this.lockConfiguration.getGroupName())) : Optional.empty(); + + boolean noLeaderPresent = !oldLeader.isPresent() || !currentMembers.contains(oldLeader.get()); + boolean alreadyLeader = oldLeader.isPresent() && oldLeader.get().equals(this.lockConfiguration.getPodName()); + + if (noLeaderPresent && !alreadyLeader) { + LOG.info("Trying to acquire the lock in configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName()); + ConfigMap newConfigMap = new ConfigMapBuilder(configMap) + .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName()) + .build(); + + kubernetesClient.configMaps() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withName(this.lockConfiguration.getConfigMapName()) + .lockResourceVersion(configMap.getMetadata().getResourceVersion()) + .replace(newConfigMap); + + LOG.info("Lock acquired for configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName()); + } else if (!noLeaderPresent) { + LOG.info("A leader is already present for configmap={}, key={}: {}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName(), oldLeader); + } else { + LOG.info("This pod ({}) is already the leader for configmap={}, key={}", this.lockConfiguration.getPodName(), this.lockConfiguration.getConfigMapName(), this.lockConfiguration + .getGroupName()); + } + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java new file mode 100644 index 0000000..f203c0a --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.HashMap; +import java.util.Map; + +import io.fabric8.kubernetes.client.KubernetesClient; + +/** + * Configuration for Kubernetes Lock. + */ +public class KubernetesLockConfiguration implements Cloneable { + + private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800; + + /** + * Kubernetes namespace containing the pods and the ConfigMap used for locking. + */ + private String kubernetesResourcesNamespace; + + /** + * Name of the ConfigMap used for locking. + */ + private String configMapName; + + /** + * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfgMap. + */ + private String groupName; + + /** + * Name of the current pod (defaults to host name). + */ + private String podName; + + /** + * Labels used to identify the members of the cluster. + */ + private Map<String, String> clusterLabels = new HashMap<>(); + + /** + * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated. + * Watch recreation can be disabled by putting a negative value (the default will be used in case of null). + */ + private Long watchRefreshIntervalSeconds; + + public KubernetesLockConfiguration() { + } + + public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient kubernetesClient) { + if (kubernetesResourcesNamespace != null) { + return kubernetesResourcesNamespace; + } + return kubernetesClient.getNamespace(); + } + + public String getKubernetesResourcesNamespace() { + return kubernetesResourcesNamespace; + } + + public void setKubernetesResourcesNamespace(String kubernetesResourcesNamespace) { + this.kubernetesResourcesNamespace = kubernetesResourcesNamespace; + } + + public String getConfigMapName() { + return configMapName; + } + + public void setConfigMapName(String configMapName) { + this.configMapName = configMapName; + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getPodName() { + return podName; + } + + public void setPodName(String podName) { + this.podName = podName; + } + + public Map<String, String> getClusterLabels() { + return clusterLabels; + } + + public void addToClusterLabels(String key, String value) { + this.clusterLabels.put(key, value); + } + + public void setClusterLabels(Map<String, String> clusterLabels) { + this.clusterLabels = clusterLabels; + } + + public Long getWatchRefreshIntervalSeconds() { + return watchRefreshIntervalSeconds; + } + + public long getWatchRefreshIntervalSecondsOrDefault() { + Long interval = watchRefreshIntervalSeconds; + if (interval == null) { + interval = DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS; + } + return interval; + } + + public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) { + this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds; + } + + public KubernetesLockConfiguration copy() { + try { + KubernetesLockConfiguration copy = (KubernetesLockConfiguration) this.clone(); + return copy; + } catch (CloneNotSupportedException e) { + throw new IllegalStateException("Cannot clone", e); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("KubernetesLockConfiguration{"); + sb.append("kubernetesResourcesNamespace='").append(kubernetesResourcesNamespace).append('\''); + sb.append(", configMapName='").append(configMapName).append('\''); + sb.append(", groupName='").append(groupName).append('\''); + sb.append(", podName='").append(podName).append('\''); + sb.append(", clusterLabels=").append(clusterLabels); + sb.append(", watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java new file mode 100644 index 0000000..d9173b2 --- /dev/null +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kubernetes.ha.lock; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; + +import org.apache.camel.Service; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitors the list of participants in a leader election and provides the most recently updated list. + * It calls the callback eventHandlers only when the member set changes w.r.t. the previous invocation. + */ +class KubernetesMembersMonitor implements Service { + + private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800; + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesMembersMonitor.class); + + private ScheduledExecutorService serializedExecutor; + + private KubernetesClient kubernetesClient; + + private KubernetesLockConfiguration lockConfiguration; + + private List<KubernetesClusterEventHandler> eventHandlers; + + private Watch watch; + + private boolean terminated; + + private boolean refreshing; + + private Set<String> previousMembers = new HashSet<>(); + + private Set<String> basePoll = new HashSet<>(); + private Set<String> deleted = new HashSet<>(); + private Set<String> added = new HashSet<>(); + + public KubernetesMembersMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) { + this.serializedExecutor = serializedExecutor; + this.kubernetesClient = kubernetesClient; + this.lockConfiguration = lockConfiguration; + this.eventHandlers = new LinkedList<>(); + } + + public void addClusterEventHandler(KubernetesClusterEventHandler eventHandler) { + this.eventHandlers.add(eventHandler); + } + + @Override + public void start() throws Exception { + serializedExecutor.execute(() -> doPoll(true)); + serializedExecutor.execute(this::createWatch); + + long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault(); + if (recreationDelay > 0) { + serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS); + } + } + + private void createWatch() { + try { + LOG.debug("Starting cluster members watcher"); + this.watch = kubernetesClient.pods() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withLabels(this.lockConfiguration.getClusterLabels()) + .watch(new Watcher<Pod>() { + + @Override + public void eventReceived(Action action, Pod pod) { + switch (action) { + case DELETED: + serializedExecutor.execute(() -> deleteAndNotify(podName(pod))); + break; + case ADDED: + serializedExecutor.execute(() -> addAndNotify(podName(pod))); + break; + default: + } + } + + @Override + public void onClose(KubernetesClientException e) { + if (!terminated) { + KubernetesMembersMonitor.this.watch = null; + if (refreshing) { + LOG.info("Refreshing pod list watcher..."); + serializedExecutor.execute(KubernetesMembersMonitor.this::createWatch); + } else { + LOG.warn("Pod list watcher has been closed unexpectedly. Recreating it in 1 second...", e); + serializedExecutor.schedule(KubernetesMembersMonitor.this::createWatch, 1, TimeUnit.SECONDS); + } + } + } + }); + } catch (Exception ex) { + LOG.warn("Unable to watch for pod list changes. Retrying in 5 seconds..."); + LOG.debug("Error while trying to watch the pod list", ex); + + serializedExecutor.schedule(this::createWatch, 5, TimeUnit.SECONDS); + } + } + + @Override + public void stop() throws Exception { + this.terminated = true; + Watch watch = this.watch; + if (watch != null) { + watch.close(); + } + } + + public void refresh() { + serializedExecutor.execute(() -> { + if (!terminated) { + refreshing = true; + try { + doPoll(false); + + Watch w = this.watch; + if (w != null) { + // It will be recreated + w.close(); + } + } finally { + refreshing = false; + } + } + }); + } + + private void doPoll(boolean retry) { + LOG.debug("Starting poll to get all cluster members"); + List<Pod> pods; + try { + pods = pollPods(); + } catch (Exception ex) { + if (retry) { + LOG.warn("Pods poll failed. Retrying in 5 seconds...", ex); + this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS); + } else { + LOG.warn("Pods poll failed", ex); + } + return; + } + + this.basePoll = pods.stream() + .map(p -> Optional.ofNullable(podName(p))) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toSet()); + + this.added = new HashSet<>(); + this.deleted = new HashSet<>(); + + LOG.debug("Base list of members is {}", this.basePoll); + + checkAndNotify(); + } + + private List<Pod> pollPods() { + return kubernetesClient.pods() + .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient)) + .withLabels(this.lockConfiguration.getClusterLabels()) + .list().getItems(); + } + + private String podName(Pod pod) { + if (pod != null && pod.getMetadata() != null) { + return pod.getMetadata().getName(); + } + return null; + } + + private void checkAndNotify() { + Set<String> newMembers = new HashSet<>(basePoll); + newMembers.addAll(added); + newMembers.removeAll(deleted); + + LOG.debug("Current list of members is: {}", newMembers); + + if (!newMembers.equals(this.previousMembers)) { + LOG.debug("List of members changed: sending notifications"); + this.previousMembers = newMembers; + + for (KubernetesClusterEventHandler eventHandler : eventHandlers) { + eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) () -> newMembers); + } + } else { + LOG.debug("List of members has not changed"); + } + } + + private void addAndNotify(String member) { + LOG.debug("Adding new member to the list: {}", member); + if (member != null) { + this.added.add(member); + checkAndNotify(); + } + } + + private void deleteAndNotify(String member) { + LOG.debug("Deleting member to the list: {}", member); + if (member != null) { + this.deleted.add(member); + checkAndNotify(); + } + } + +}