CAMEL-11331: Implemented KubernetesClusterService

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/45481262
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/45481262
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/45481262

Branch: refs/heads/master
Commit: 45481262c44d4e7caa4749725e01687a53916668
Parents: 9fc6d0b
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Fri Jun 30 17:42:33 2017 +0200
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200

----------------------------------------------------------------------
 components/camel-kubernetes/pom.xml             |   6 +-
 .../kubernetes/AbstractKubernetesEndpoint.java  |  53 +---
 .../kubernetes/KubernetesConfiguration.java     |  15 +-
 .../component/kubernetes/KubernetesHelper.java  |  98 +++++++
 .../kubernetes/ha/KubernetesClusterService.java | 151 +++++++++++
 .../kubernetes/ha/KubernetesClusterView.java    | 168 ++++++++++++
 .../ha/lock/KubernetesClusterEvent.java         |  46 ++++
 .../ha/lock/KubernetesClusterEventHandler.java  |  27 ++
 .../ha/lock/KubernetesLeaderMonitor.java        | 256 +++++++++++++++++++
 .../ha/lock/KubernetesLeadershipController.java | 211 +++++++++++++++
 .../ha/lock/KubernetesLockConfiguration.java    | 153 +++++++++++
 .../ha/lock/KubernetesMembersMonitor.java       | 239 +++++++++++++++++
 12 files changed, 1368 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/pom.xml 
b/components/camel-kubernetes/pom.xml
index e5409c8..c444068 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -44,12 +44,14 @@
     <dependency>
       <groupId>io.fabric8</groupId>
       <artifactId>kubernetes-client</artifactId>
-      <version>${kubernetes-client-version}</version>
+      <version>2.3-SNAPSHOT</version>
+      <!--<version>${kubernetes-client-version}</version>-->
     </dependency>
     <dependency>
       <groupId>io.fabric8</groupId>
       <artifactId>openshift-client</artifactId>
-      <version>${kubernetes-client-version}</version>
+      <version>2.3-SNAPSHOT</version>
+      <!--<version>${kubernetes-client-version}</version>-->
     </dependency>
     <!-- testing -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
index f48bf6d..b7aeb37 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
@@ -18,14 +18,10 @@ package org.apache.camel.component.kubernetes;
 
 import java.util.concurrent.ExecutorService;
 
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +50,7 @@ public abstract class AbstractKubernetesEndpoint extends 
DefaultEndpoint {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        client = configuration.getKubernetesClient() != null ? 
configuration.getKubernetesClient() : createKubernetesClient();
+        client = KubernetesHelper.getKubernetesClient(configuration);
     }
 
     @Override
@@ -80,52 +76,5 @@ public abstract class AbstractKubernetesEndpoint extends 
DefaultEndpoint {
         return configuration;
     }
 
-    private KubernetesClient createKubernetesClient() {
-        LOG.debug("Create Kubernetes client with the following Configuration: 
" + configuration.toString());
 
-        ConfigBuilder builder = new ConfigBuilder();
-        builder.withMasterUrl(configuration.getMasterUrl());
-        if ((ObjectHelper.isNotEmpty(configuration.getUsername())
-                && ObjectHelper.isNotEmpty(configuration.getPassword()))
-                && ObjectHelper.isEmpty(configuration.getOauthToken())) {
-            builder.withUsername(configuration.getUsername());
-            builder.withPassword(configuration.getPassword());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getOauthToken())) {
-            builder.withOauthToken(configuration.getOauthToken());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
-            builder.withCaCertData(configuration.getCaCertData());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
-            builder.withCaCertFile(configuration.getCaCertFile());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
-            builder.withClientCertData(configuration.getClientCertData());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
-            builder.withClientCertFile(configuration.getClientCertFile());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
-            builder.withApiVersion(configuration.getApiVersion());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
-            builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
-            builder.withClientKeyData(configuration.getClientKeyData());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
-            builder.withClientKeyFile(configuration.getClientKeyFile());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
-            
builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
-        }
-        if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
-            builder.withTrustCerts(configuration.getTrustCerts());
-        }
-
-        Config conf = builder.build();
-        return new DefaultKubernetesClient(conf);
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
index 89d0d9a..271ef71 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
@@ -19,13 +19,14 @@ package org.apache.camel.component.kubernetes;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 
 @UriParams
-public class KubernetesConfiguration {
+public class KubernetesConfiguration implements Cloneable {
 
     @UriPath
     @Metadata(required = "true")
@@ -395,6 +396,18 @@ public class KubernetesConfiguration {
         this.resourceName = resourceName;
     }
 
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public KubernetesConfiguration copy() {
+        try {
+            return (KubernetesConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
     @Override
     public String toString() {
         return "KubernetesConfiguration [masterUrl=" + masterUrl + ", 
category=" + category + ", kubernetesClient="

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
new file mode 100644
index 0000000..62235ad
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper moethods for Kubernetes resources.
+ */
+public final class KubernetesHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesHelper.class);
+
+    private KubernetesHelper() {
+    }
+
+    public static KubernetesClient getKubernetesClient(KubernetesConfiguration 
configuration) {
+        if (configuration.getKubernetesClient() != null) {
+            return configuration.getKubernetesClient();
+        } else if (configuration.getMasterUrl() != null) {
+            return createKubernetesClient(configuration);
+        } else {
+            LOG.info("Creating default kubernetes client without applying 
configuration");
+            return new DefaultKubernetesClient();
+        }
+    }
+
+    private static KubernetesClient 
createKubernetesClient(KubernetesConfiguration configuration) {
+        LOG.debug("Create Kubernetes client with the following Configuration: 
" + configuration.toString());
+
+        ConfigBuilder builder = new ConfigBuilder();
+        builder.withMasterUrl(configuration.getMasterUrl());
+        if ((ObjectHelper.isNotEmpty(configuration.getUsername())
+                && ObjectHelper.isNotEmpty(configuration.getPassword()))
+                && ObjectHelper.isEmpty(configuration.getOauthToken())) {
+            builder.withUsername(configuration.getUsername());
+            builder.withPassword(configuration.getPassword());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getOauthToken())) {
+            builder.withOauthToken(configuration.getOauthToken());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
+            builder.withCaCertData(configuration.getCaCertData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
+            builder.withCaCertFile(configuration.getCaCertFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
+            builder.withClientCertData(configuration.getClientCertData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
+            builder.withClientCertFile(configuration.getClientCertFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
+            builder.withApiVersion(configuration.getApiVersion());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
+            builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
+            builder.withClientKeyData(configuration.getClientKeyData());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
+            builder.withClientKeyFile(configuration.getClientKeyFile());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
+            
builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
+            builder.withTrustCerts(configuration.getTrustCerts());
+        }
+
+        Config conf = builder.build();
+        return new DefaultKubernetesClient(conf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
new file mode 100644
index 0000000..6d87d48
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import 
org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * A Kubernetes based cluster service leveraging Kubernetes optimistic locks 
on resources (specifically ConfigMaps).
+ */
+public class KubernetesClusterService extends 
AbstractCamelClusterService<KubernetesClusterView> {
+
+    public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
+
+    private KubernetesConfiguration configuration;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    public KubernetesClusterService() {
+        this.configuration = new KubernetesConfiguration();
+        this.lockConfiguration = new KubernetesLockConfiguration();
+    }
+
+    public KubernetesClusterService(KubernetesConfiguration configuration) {
+        this.configuration = configuration.copy();
+        this.lockConfiguration = new KubernetesLockConfiguration();
+    }
+
+    public KubernetesClusterService(CamelContext camelContext, 
KubernetesConfiguration configuration) {
+        super(null, camelContext);
+        this.configuration = configuration.copy();
+        this.lockConfiguration = new KubernetesLockConfiguration();
+    }
+
+    @Override
+    protected KubernetesClusterView createView(String namespace) throws 
Exception {
+        KubernetesLockConfiguration lockConfig = 
configWithGroupNameAndDefaults(namespace);
+        return new KubernetesClusterView(this, configuration, lockConfig);
+    }
+
+    protected KubernetesLockConfiguration 
configWithGroupNameAndDefaults(String groupName) {
+        KubernetesLockConfiguration config = this.lockConfiguration.copy();
+
+        config.setGroupName(ObjectHelper.notNull(groupName, "groupName"));
+
+        // Check defaults (Namespace and podName can be null)
+        if (config.getConfigMapName() == null) {
+            config.setConfigMapName(DEFAULT_CONFIGMAP_NAME);
+        }
+        if (config.getPodName() == null) {
+            config.setPodName(System.getenv("HOSTNAME"));
+            if (config.getPodName() == null) {
+                try {
+                    
config.setPodName(InetAddress.getLocalHost().getHostName());
+                } catch (Exception e) {
+                    throw new RuntimeCamelException("Unable to determine pod 
name", e);
+                }
+            }
+        }
+
+        return config;
+    }
+
+    public String getMasterUrl() {
+        return configuration.getMasterUrl();
+    }
+
+    /**
+     * Set the URL of the Kubernetes master (read from Kubernetes client 
properties by default).
+     */
+    public void setMasterUrl(String masterUrl) {
+        configuration.setMasterUrl(masterUrl);
+    }
+
+    public String getKubernetesNamespace() {
+        return this.lockConfiguration.getKubernetesResourcesNamespace();
+    }
+
+    /**
+     * Set the name of the Kubernetes namespace containing the pods and the 
configmap (autodetected by default)
+     */
+    public void setKubernetesNamespace(String kubernetesNamespace) {
+        
this.lockConfiguration.setKubernetesResourcesNamespace(kubernetesNamespace);
+    }
+
+    public String getConfigMapName() {
+        return this.lockConfiguration.getConfigMapName();
+    }
+
+    /**
+     * Set the name of the ConfigMap used to do optimistic locking (defaults 
to 'leaders').
+     */
+    public void setConfigMapName(String configMapName) {
+        this.lockConfiguration.setConfigMapName(configMapName);
+    }
+
+    public String getPodName() {
+        return this.lockConfiguration.getPodName();
+    }
+
+    /**
+     * Set the name of the current pod (autodetected from container host name 
by default).
+     */
+    public void setPodName(String podName) {
+        this.lockConfiguration.setPodName(podName);
+    }
+
+    public Map<String, String> getClusterLabels() {
+        return lockConfiguration.getClusterLabels();
+    }
+
+    /**
+     * Set the labels used to identify the pods composing the cluster.
+     */
+    public void setClusterLabels(Map<String, String> clusterLabels) {
+        lockConfiguration.setClusterLabels(clusterLabels);
+    }
+
+    public Long getWatchRefreshIntervalSeconds() {
+        return lockConfiguration.getWatchRefreshIntervalSeconds();
+    }
+
+    /**
+     * Indicates the maximum amount of time a Kubernetes watch should be kept 
active, before being recreated.
+     * Watch recreation can be disabled by putting a negative value (the 
default will be used in case of null).
+     */
+    public void setWatchRefreshIntervalSeconds(Long 
watchRefreshIntervalSeconds) {
+        
lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
new file mode 100644
index 0000000..9ac6a86
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.KubernetesHelper;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
+import 
org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController;
+import 
org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.impl.ha.AbstractCamelClusterView;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * The cluster view on a specific Camel cluster namespace (not to be confused 
with Kubernetes namespaces).
+ * Namespaces are represented as keys in a Kubernetes ConfigMap (values are 
the current leader pods).
+ */
+public class KubernetesClusterView extends AbstractCamelClusterView {
+
+    private KubernetesClient kubernetesClient;
+
+    private KubernetesConfiguration configuration;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    private KubernetesClusterMember localMember;
+
+    private Map<String, KubernetesClusterMember> memberCache;
+
+    private volatile Optional<CamelClusterMember> currentLeader = 
Optional.empty();
+
+    private volatile List<CamelClusterMember> currentMembers = 
Collections.emptyList();
+
+    private KubernetesLeadershipController controller;
+
+    public KubernetesClusterView(KubernetesClusterService cluster, 
KubernetesConfiguration configuration, KubernetesLockConfiguration 
lockConfiguration) {
+        super(cluster, lockConfiguration.getGroupName());
+        this.configuration = configuration;
+        this.lockConfiguration = lockConfiguration;
+        this.localMember = new 
KubernetesClusterMember(lockConfiguration.getPodName());
+        this.memberCache = new HashMap<>();
+    }
+
+    @Override
+    public Optional<CamelClusterMember> getMaster() {
+        return currentLeader;
+    }
+
+    @Override
+    public CamelClusterMember getLocalMember() {
+        return localMember;
+    }
+
+    @Override
+    public List<CamelClusterMember> getMembers() {
+        return currentMembers;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (controller == null) {
+            this.kubernetesClient = 
KubernetesHelper.getKubernetesClient(configuration);
+
+            controller = new KubernetesLeadershipController(kubernetesClient, 
this.lockConfiguration, event -> {
+                if (event instanceof 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
+                    // New leader
+                    Optional<String> leader = 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();
+                    currentLeader = leader.map(this::toMember);
+                    if (currentLeader.isPresent()) {
+                        fireLeadershipChangedEvent(currentLeader.get());
+                    }
+                } else if (event instanceof 
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) {
+                    Set<String> members = 
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(event).getData();
+                    Set<String> oldMembers = 
currentMembers.stream().map(CamelClusterMember::getId).collect(Collectors.toSet());
+                    currentMembers = 
members.stream().map(this::toMember).collect(Collectors.toList());
+
+                    // Computing differences
+                    Set<String> added = new HashSet<>(members);
+                    added.removeAll(oldMembers);
+
+                    Set<String> removed = new HashSet<>(oldMembers);
+                    removed.removeAll(members);
+
+                    for (String id : added) {
+                        fireMemberAddedEvent(toMember(id));
+                    }
+
+                    for (String id : removed) {
+                        fireMemberRemovedEvent(toMember(id));
+                    }
+                }
+            });
+
+            controller.start();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (controller != null) {
+            controller.stop();
+            controller = null;
+            kubernetesClient.close();
+            kubernetesClient = null;
+        }
+    }
+
+    protected KubernetesClusterMember toMember(String name) {
+        if (name.equals(localMember.getId())) {
+            return localMember;
+        }
+        return memberCache.computeIfAbsent(name, KubernetesClusterMember::new);
+    }
+
+    class KubernetesClusterMember implements CamelClusterMember {
+
+        private String podName;
+
+        public KubernetesClusterMember(String podName) {
+            this.podName = ObjectHelper.notNull(podName, "podName");
+        }
+
+        @Override
+        public boolean isMaster() {
+            return currentLeader.isPresent() && 
currentLeader.get().getId().equals(podName);
+        }
+
+        @Override
+        public String getId() {
+            return podName;
+        }
+
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new 
StringBuilder("KubernetesClusterMember{");
+            sb.append("podName='").append(podName).append('\'');
+            sb.append('}');
+            return sb.toString();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
new file mode 100644
index 0000000..59f8768
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Super interface for events produced by the Kubernetes cluster.
+ */
+@FunctionalInterface
+public interface KubernetesClusterEvent {
+
+    Object getData();
+
+    /**
+     * Event signalling that the list of members of the Kubernetes cluster has 
changed.
+     */
+    interface KubernetesClusterMemberListChangedEvent extends 
KubernetesClusterEvent {
+        @Override
+        Set<String> getData();
+    }
+
+    /**
+     * Event signalling the presence of a new leader.
+     */
+    interface KubernetesClusterLeaderChangedEvent extends 
KubernetesClusterEvent {
+        @Override
+        Optional<String> getData();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
new file mode 100644
index 0000000..0962847
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+/**
+ * Interface for handling Kubernetes cluster events.
+ */
+@FunctionalInterface
+public interface KubernetesClusterEventHandler {
+
+    void onKubernetesClusterEvent(KubernetesClusterEvent event);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
new file mode 100644
index 0000000..5555fe1
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors continuously the configmap to detect changes in leadership.
+ * It calls the callback eventHandlers only when the leader changes w.r.t. the 
previous invocation.
+ */
+class KubernetesLeaderMonitor implements Service {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderMonitor.class);
+
+    private ScheduledExecutorService serializedExecutor;
+
+    private KubernetesClient kubernetesClient;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    private List<KubernetesClusterEventHandler> eventHandlers;
+
+    private Watch watch;
+
+    private boolean terminated;
+
+    private boolean refreshing;
+
+    private ConfigMap latestConfigMap;
+
+    public KubernetesLeaderMonitor(ScheduledExecutorService 
serializedExecutor, KubernetesClient kubernetesClient, 
KubernetesLockConfiguration lockConfiguration) {
+        this.serializedExecutor = serializedExecutor;
+        this.kubernetesClient = kubernetesClient;
+        this.lockConfiguration = lockConfiguration;
+        this.eventHandlers = new LinkedList<>();
+    }
+
+    public void addClusterEventHandler(KubernetesClusterEventHandler 
leaderEventHandler) {
+        this.eventHandlers.add(leaderEventHandler);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.terminated = false;
+        serializedExecutor.execute(this::startWatch);
+        serializedExecutor.execute(() -> doPoll(true));
+
+        long recreationDelay = 
lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
+        if (recreationDelay > 0) {
+            serializedExecutor.scheduleWithFixedDelay(this::refresh, 
recreationDelay, recreationDelay, TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        this.terminated = true;
+        Watch watch = this.watch;
+        if (watch != null) {
+            watch.close();
+        }
+    }
+
+    public void refresh() {
+        serializedExecutor.execute(() -> {
+            if (!terminated) {
+                refreshing = true;
+                try {
+                    doPoll(false);
+
+                    Watch w = this.watch;
+                    if (w != null) {
+                        // It will be recreated
+                        w.close();
+                    }
+                } finally {
+                    refreshing = false;
+                }
+            }
+        });
+    }
+
+    private void startWatch() {
+        try {
+            LOG.debug("Starting ConfigMap watcher for monitoring the current 
leader");
+            this.watch = kubernetesClient.configMaps()
+                    
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                    .withName(this.lockConfiguration.getConfigMapName())
+                    .watch(new Watcher<ConfigMap>() {
+
+                        @Override
+                        public void eventReceived(Action action, ConfigMap 
configMap) {
+                            switch (action) {
+                            case MODIFIED:
+                            case DELETED:
+                            case ADDED:
+                                LOG.debug("Received update from watch on 
ConfigMap {}", configMap);
+                                serializedExecutor.execute(() -> 
checkAndNotify(configMap));
+                                break;
+                            default:
+                            }
+                        }
+
+                        @Override
+                        public void onClose(KubernetesClientException e) {
+                            if (!terminated) {
+                                KubernetesLeaderMonitor.this.watch = null;
+                                if (refreshing) {
+                                    LOG.info("Refreshing ConfigMap 
watcher...");
+                                    
serializedExecutor.execute(KubernetesLeaderMonitor.this::startWatch);
+                                } else {
+                                    LOG.warn("ConfigMap watcher has been 
closed unexpectedly. Recreating it in 1 second...", e);
+                                    
serializedExecutor.schedule(KubernetesLeaderMonitor.this::startWatch, 1, 
TimeUnit.SECONDS);
+                                }
+                            }
+                        }
+                    });
+        } catch (Exception ex) {
+            LOG.warn("Unable to watch for configmap changes. Retrying in 5 
seconds...");
+            LOG.debug("Error while trying to watch the configmap", ex);
+
+            this.serializedExecutor.schedule(this::startWatch, 5, 
TimeUnit.SECONDS);
+        }
+    }
+
+    private void doPoll(boolean retry) {
+        LOG.debug("Starting poll to get configmap {}", 
this.lockConfiguration.getConfigMapName());
+        ConfigMap configMap;
+        try {
+            configMap = pollConfigMap();
+        } catch (Exception ex) {
+            if (retry) {
+                LOG.warn("ConfigMap poll failed. Retrying in 5 seconds...", 
ex);
+                this.serializedExecutor.schedule(() -> doPoll(true), 5, 
TimeUnit.SECONDS);
+            } else {
+                LOG.warn("ConfigMap poll failed", ex);
+            }
+            return;
+        }
+
+        checkAndNotify(configMap);
+    }
+
+    private void checkAndNotify(ConfigMap candidateConfigMap) {
+        LOG.debug("Checking configMap {}", candidateConfigMap);
+        ConfigMap newConfigMap = newest(this.latestConfigMap, 
candidateConfigMap);
+        Optional<String> leader = extractLeader(newConfigMap);
+        Optional<String> oldLeader = extractLeader(this.latestConfigMap);
+
+        this.latestConfigMap = newConfigMap;
+
+        LOG.debug("The new leader is {}. Old leader was {}", leader, 
oldLeader);
+        if (!leader.equals(oldLeader)) {
+            LOG.debug("Notifying the new leader to all eventHandlers");
+            for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
+                
eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent)
 () -> leader);
+            }
+        } else {
+            LOG.debug("Leader has not changed");
+        }
+    }
+
+    private ConfigMap pollConfigMap() {
+        return kubernetesClient.configMaps()
+                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                .withName(this.lockConfiguration.getConfigMapName())
+                .get();
+    }
+
+    private Optional<String> extractLeader(ConfigMap configMap) {
+        Optional<String> leader = Optional.empty();
+        if (configMap != null && configMap.getData() != null) {
+            leader = 
Optional.ofNullable(configMap.getData().get(this.lockConfiguration.getGroupName()));
+        }
+        return leader;
+    }
+
+    private ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2) {
+        ConfigMap newest = null;
+
+        if (configMap1 != null && configMap2 == null) {
+            newest = configMap1;
+        } else if (configMap1 == null && configMap2 != null) {
+            newest = configMap2;
+        }
+
+        if (newest == null) {
+            String rv1 = extractResourceVersion(configMap1);
+            String rv2 = extractResourceVersion(configMap2);
+            newest = newest(configMap1, configMap2, rv1, rv2);
+        }
+
+        if (newest == null) {
+            String ct1 = extractCreationTimestamp(configMap1);
+            String ct2 = extractCreationTimestamp(configMap2);
+            // timestamps are string-comparable
+            newest = newest(configMap1, configMap2, ct1, ct2);
+        }
+
+        return newest;
+    }
+
+    private <T extends Comparable<T>> ConfigMap newest(ConfigMap configMap1, 
ConfigMap configMap2, T cmp1, T cmp2) {
+        if (cmp1 != null && cmp2 != null) {
+            int comp = cmp1.compareTo(cmp2);
+            if (comp > 0) {
+                return configMap1;
+            } else {
+                return configMap2;
+            }
+        }
+        return null;
+    }
+
+    private String extractResourceVersion(ConfigMap configMap) {
+        if (configMap != null && configMap.getMetadata() != null) {
+            return configMap.getMetadata().getResourceVersion();
+        }
+        return null;
+    }
+
+    private String extractCreationTimestamp(ConfigMap configMap) {
+        if (configMap != null && configMap.getMetadata() != null) {
+            return configMap.getMetadata().getCreationTimestamp();
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
new file mode 100644
index 0000000..ad2f9bc
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start the monitors and participate to leader election when no active 
leaders are present.
+ * It communicates changes in leadership and cluster members to the given 
event handler.
+ */
+public class KubernetesLeadershipController implements Service {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeadershipController.class);
+
+    private KubernetesClient kubernetesClient;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    private ScheduledExecutorService executor;
+
+    private KubernetesLeaderMonitor leaderMonitor;
+
+    private KubernetesMembersMonitor membersMonitor;
+
+    private Optional<String> currentLeader;
+
+    private Set<String> currentMembers;
+
+    private KubernetesClusterEventHandler eventHandler;
+
+    public KubernetesLeadershipController(KubernetesClient kubernetesClient, 
KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler 
eventHandler) {
+
+        this.kubernetesClient = kubernetesClient;
+        this.lockConfiguration = lockConfiguration;
+        this.eventHandler = eventHandler;
+
+        this.currentLeader = Optional.empty();
+        this.currentMembers = Collections.emptySet();
+    }
+
+    @Override
+    public void start() throws Exception {
+
+        if (executor == null) {
+            executor = Executors.newSingleThreadScheduledExecutor(); // No 
concurrency
+            leaderMonitor = new KubernetesLeaderMonitor(this.executor, 
this.kubernetesClient, this.lockConfiguration);
+            membersMonitor = new KubernetesMembersMonitor(this.executor, 
this.kubernetesClient, this.lockConfiguration);
+
+            leaderMonitor.addClusterEventHandler(e -> executor.execute(() -> 
onLeaderChanged(e)));
+            if (eventHandler != null) {
+                leaderMonitor.addClusterEventHandler(eventHandler);
+            }
+
+            membersMonitor.addClusterEventHandler(e -> executor.execute(() -> 
onMembersChanged(e)));
+            if (eventHandler != null) {
+                membersMonitor.addClusterEventHandler(eventHandler);
+            }
+
+            // Start all services
+            leaderMonitor.start();
+            membersMonitor.start();
+
+            // Fire a new election if possible
+            executor.execute(this::runLeaderElection);
+        }
+
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (executor != null) {
+            membersMonitor.stop();
+            leaderMonitor.stop();
+            executor.shutdown();
+            executor.shutdownNow();
+
+            membersMonitor = null;
+            leaderMonitor = null;
+            executor = null;
+        }
+    }
+
+    private void onLeaderChanged(KubernetesClusterEvent e) {
+        Optional<String> newLeader = 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(e).getData();
+        if (!newLeader.isPresent()) {
+            executor.execute(this::tryLeaderElection);
+        }
+        this.currentLeader = newLeader;
+    }
+
+    private void onMembersChanged(KubernetesClusterEvent e) {
+        Set<String> newMembers = 
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(e).getData();
+        if (currentLeader.isPresent()) {
+            // Check if the current leader is still present in the list
+            if (!newMembers.contains(currentLeader.get()) && 
currentMembers.contains(currentLeader.get())) {
+                executor.execute(this::runLeaderElection);
+            }
+        }
+        this.currentMembers = newMembers;
+    }
+
+    private void runLeaderElection() {
+        boolean finished = false;
+        try {
+            finished = tryLeaderElection();
+        } catch (Exception ex) {
+            LOG.warn("Exception while trying to acquire the leadership", ex);
+        }
+
+        if (!finished) {
+            executor.schedule(this::runLeaderElection, 1, TimeUnit.SECONDS);
+        }
+    }
+
+    private boolean tryLeaderElection() {
+        LOG.debug("Starting leader election");
+        if (!currentMembers.contains(this.lockConfiguration.getPodName())) {
+            LOG.debug("The current pod ({}) is not in the list of 
participating pods {}. Cannot participate to the election", 
this.lockConfiguration.getPodName(), currentMembers);
+            return false;
+        }
+
+        ConfigMap configMap = kubernetesClient.configMaps()
+                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                .withName(this.lockConfiguration.getConfigMapName())
+                .get();
+
+        if (configMap == null) {
+            // No configmap created so far
+            LOG.info("Lock configmap is not present in the Kubernetes 
namespace. A new ConfigMap will be created");
+
+            ConfigMap newConfigMap = new ConfigMapBuilder().
+                    withNewMetadata()
+                    .withName(this.lockConfiguration.getConfigMapName())
+                    .addToLabels("provider", "camel")
+                    .addToLabels("kind", "locks").
+                            endMetadata()
+                    .addToData(this.lockConfiguration.getGroupName(), 
this.lockConfiguration.getPodName())
+                    .build();
+
+            try {
+                kubernetesClient.configMaps()
+                        
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                        .create(newConfigMap);
+            } catch (Exception ex) {
+                // Suppress exception
+                LOG.warn("Unable to create the ConfigMap, it may have been 
created by other cluster members concurrently. If the problem persists, check 
if the service account has the right "
+                        + "permissions to create it");
+                LOG.debug("Exception while trying to create the ConfigMap", 
ex);
+                return false;
+            }
+            return true;
+        } else {
+            LOG.info("Lock configmap already present in the Kubernetes 
namespace. Checking...");
+            Map<String, String> leaders = configMap.getData();
+            Optional<String> oldLeader = leaders != null ? 
Optional.ofNullable(leaders.get(this.lockConfiguration.getGroupName())) : 
Optional.empty();
+
+            boolean noLeaderPresent = !oldLeader.isPresent() || 
!currentMembers.contains(oldLeader.get());
+            boolean alreadyLeader = oldLeader.isPresent() && 
oldLeader.get().equals(this.lockConfiguration.getPodName());
+
+            if (noLeaderPresent && !alreadyLeader) {
+                LOG.info("Trying to acquire the lock in configmap={}, key={}", 
this.lockConfiguration.getConfigMapName(), 
this.lockConfiguration.getGroupName());
+                ConfigMap newConfigMap = new ConfigMapBuilder(configMap)
+                        .addToData(this.lockConfiguration.getGroupName(), 
this.lockConfiguration.getPodName())
+                        .build();
+
+                kubernetesClient.configMaps()
+                        
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                        .withName(this.lockConfiguration.getConfigMapName())
+                        
.lockResourceVersion(configMap.getMetadata().getResourceVersion())
+                        .replace(newConfigMap);
+
+                LOG.info("Lock acquired for configmap={}, key={}", 
this.lockConfiguration.getConfigMapName(), 
this.lockConfiguration.getGroupName());
+            } else if (!noLeaderPresent) {
+                LOG.info("A leader is already present for configmap={}, 
key={}: {}", this.lockConfiguration.getConfigMapName(), 
this.lockConfiguration.getGroupName(), oldLeader);
+            } else {
+                LOG.info("This pod ({}) is already the leader for 
configmap={}, key={}", this.lockConfiguration.getPodName(), 
this.lockConfiguration.getConfigMapName(), this.lockConfiguration
+                        .getGroupName());
+            }
+            return true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
new file mode 100644
index 0000000..f203c0a
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/**
+ * Configuration for Kubernetes Lock.
+ */
+public class KubernetesLockConfiguration implements Cloneable {
+
+    private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
+
+    /**
+     * Kubernetes namespace containing the pods and the ConfigMap used for 
locking.
+     */
+    private String kubernetesResourcesNamespace;
+
+    /**
+     * Name of the ConfigMap used for locking.
+     */
+    private String configMapName;
+
+    /**
+     * Name of the lock group (or namespace according to the Camel cluster 
convention) within the chosen ConfgMap.
+     */
+    private String groupName;
+
+    /**
+     * Name of the current pod (defaults to host name).
+     */
+    private String podName;
+
+    /**
+     * Labels used to identify the members of the cluster.
+     */
+    private Map<String, String> clusterLabels = new HashMap<>();
+
+    /**
+     * Indicates the maximum amount of time a Kubernetes watch should be kept 
active, before being recreated.
+     * Watch recreation can be disabled by putting a negative value (the 
default will be used in case of null).
+     */
+    private Long watchRefreshIntervalSeconds;
+
+    public KubernetesLockConfiguration() {
+    }
+
+    public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient 
kubernetesClient) {
+        if (kubernetesResourcesNamespace != null) {
+            return kubernetesResourcesNamespace;
+        }
+        return kubernetesClient.getNamespace();
+    }
+
+    public String getKubernetesResourcesNamespace() {
+        return kubernetesResourcesNamespace;
+    }
+
+    public void setKubernetesResourcesNamespace(String 
kubernetesResourcesNamespace) {
+        this.kubernetesResourcesNamespace = kubernetesResourcesNamespace;
+    }
+
+    public String getConfigMapName() {
+        return configMapName;
+    }
+
+    public void setConfigMapName(String configMapName) {
+        this.configMapName = configMapName;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public String getPodName() {
+        return podName;
+    }
+
+    public void setPodName(String podName) {
+        this.podName = podName;
+    }
+
+    public Map<String, String> getClusterLabels() {
+        return clusterLabels;
+    }
+
+    public void addToClusterLabels(String key, String value) {
+        this.clusterLabels.put(key, value);
+    }
+
+    public void setClusterLabels(Map<String, String> clusterLabels) {
+        this.clusterLabels = clusterLabels;
+    }
+
+    public Long getWatchRefreshIntervalSeconds() {
+        return watchRefreshIntervalSeconds;
+    }
+
+    public long getWatchRefreshIntervalSecondsOrDefault() {
+        Long interval = watchRefreshIntervalSeconds;
+        if (interval == null) {
+            interval = DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS;
+        }
+        return interval;
+    }
+
+    public void setWatchRefreshIntervalSeconds(Long 
watchRefreshIntervalSeconds) {
+        this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds;
+    }
+
+    public KubernetesLockConfiguration copy() {
+        try {
+            KubernetesLockConfiguration copy = (KubernetesLockConfiguration) 
this.clone();
+            return copy;
+        } catch (CloneNotSupportedException e) {
+            throw new IllegalStateException("Cannot clone", e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("KubernetesLockConfiguration{");
+        
sb.append("kubernetesResourcesNamespace='").append(kubernetesResourcesNamespace).append('\'');
+        sb.append(", configMapName='").append(configMapName).append('\'');
+        sb.append(", groupName='").append(groupName).append('\'');
+        sb.append(", podName='").append(podName).append('\'');
+        sb.append(", clusterLabels=").append(clusterLabels);
+        sb.append(", 
watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds);
+        sb.append('}');
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
new file mode 100644
index 0000000..d9173b2
--- /dev/null
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors the list of participants in a leader election and provides the 
most recently updated list.
+ * It calls the callback eventHandlers only when the member set changes w.r.t. 
the previous invocation.
+ */
+class KubernetesMembersMonitor implements Service {
+
+    private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesMembersMonitor.class);
+
+    private ScheduledExecutorService serializedExecutor;
+
+    private KubernetesClient kubernetesClient;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    private List<KubernetesClusterEventHandler> eventHandlers;
+
+    private Watch watch;
+
+    private boolean terminated;
+
+    private boolean refreshing;
+
+    private Set<String> previousMembers = new HashSet<>();
+
+    private Set<String> basePoll = new HashSet<>();
+    private Set<String> deleted = new HashSet<>();
+    private Set<String> added = new HashSet<>();
+
+    public KubernetesMembersMonitor(ScheduledExecutorService 
serializedExecutor, KubernetesClient kubernetesClient, 
KubernetesLockConfiguration lockConfiguration) {
+        this.serializedExecutor = serializedExecutor;
+        this.kubernetesClient = kubernetesClient;
+        this.lockConfiguration = lockConfiguration;
+        this.eventHandlers = new LinkedList<>();
+    }
+
+    public void addClusterEventHandler(KubernetesClusterEventHandler 
eventHandler) {
+        this.eventHandlers.add(eventHandler);
+    }
+
+    @Override
+    public void start() throws Exception {
+        serializedExecutor.execute(() -> doPoll(true));
+        serializedExecutor.execute(this::createWatch);
+
+        long recreationDelay = 
lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
+        if (recreationDelay > 0) {
+            serializedExecutor.scheduleWithFixedDelay(this::refresh, 
recreationDelay, recreationDelay, TimeUnit.SECONDS);
+        }
+    }
+
+    private void createWatch() {
+        try {
+            LOG.debug("Starting cluster members watcher");
+            this.watch = kubernetesClient.pods()
+                    
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                    .withLabels(this.lockConfiguration.getClusterLabels())
+                    .watch(new Watcher<Pod>() {
+
+                        @Override
+                        public void eventReceived(Action action, Pod pod) {
+                            switch (action) {
+                            case DELETED:
+                                serializedExecutor.execute(() -> 
deleteAndNotify(podName(pod)));
+                                break;
+                            case ADDED:
+                                serializedExecutor.execute(() -> 
addAndNotify(podName(pod)));
+                                break;
+                            default:
+                            }
+                        }
+
+                        @Override
+                        public void onClose(KubernetesClientException e) {
+                            if (!terminated) {
+                                KubernetesMembersMonitor.this.watch = null;
+                                if (refreshing) {
+                                    LOG.info("Refreshing pod list watcher...");
+                                    
serializedExecutor.execute(KubernetesMembersMonitor.this::createWatch);
+                                } else {
+                                    LOG.warn("Pod list watcher has been closed 
unexpectedly. Recreating it in 1 second...", e);
+                                    
serializedExecutor.schedule(KubernetesMembersMonitor.this::createWatch, 1, 
TimeUnit.SECONDS);
+                                }
+                            }
+                        }
+                    });
+        } catch (Exception ex) {
+            LOG.warn("Unable to watch for pod list changes. Retrying in 5 
seconds...");
+            LOG.debug("Error while trying to watch the pod list", ex);
+
+            serializedExecutor.schedule(this::createWatch, 5, 
TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        this.terminated = true;
+        Watch watch = this.watch;
+        if (watch != null) {
+            watch.close();
+        }
+    }
+
+    public void refresh() {
+        serializedExecutor.execute(() -> {
+            if (!terminated) {
+                refreshing = true;
+                try {
+                    doPoll(false);
+
+                    Watch w = this.watch;
+                    if (w != null) {
+                        // It will be recreated
+                        w.close();
+                    }
+                } finally {
+                    refreshing = false;
+                }
+            }
+        });
+    }
+
+    private void doPoll(boolean retry) {
+        LOG.debug("Starting poll to get all cluster members");
+        List<Pod> pods;
+        try {
+            pods = pollPods();
+        } catch (Exception ex) {
+            if (retry) {
+                LOG.warn("Pods poll failed. Retrying in 5 seconds...", ex);
+                this.serializedExecutor.schedule(() -> doPoll(true), 5, 
TimeUnit.SECONDS);
+            } else {
+                LOG.warn("Pods poll failed", ex);
+            }
+            return;
+        }
+
+        this.basePoll = pods.stream()
+                .map(p -> Optional.ofNullable(podName(p)))
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .collect(Collectors.toSet());
+
+        this.added = new HashSet<>();
+        this.deleted = new HashSet<>();
+
+        LOG.debug("Base list of members is {}", this.basePoll);
+
+        checkAndNotify();
+    }
+
+    private List<Pod> pollPods() {
+        return kubernetesClient.pods()
+                
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                .withLabels(this.lockConfiguration.getClusterLabels())
+                .list().getItems();
+    }
+
+    private String podName(Pod pod) {
+        if (pod != null && pod.getMetadata() != null) {
+            return pod.getMetadata().getName();
+        }
+        return null;
+    }
+
+    private void checkAndNotify() {
+        Set<String> newMembers = new HashSet<>(basePoll);
+        newMembers.addAll(added);
+        newMembers.removeAll(deleted);
+
+        LOG.debug("Current list of members is: {}", newMembers);
+
+        if (!newMembers.equals(this.previousMembers)) {
+            LOG.debug("List of members changed: sending notifications");
+            this.previousMembers = newMembers;
+
+            for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
+                
eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent)
 () -> newMembers);
+            }
+        } else {
+            LOG.debug("List of members has not changed");
+        }
+    }
+
+    private void addAndNotify(String member) {
+        LOG.debug("Adding new member to the list: {}", member);
+        if (member != null) {
+            this.added.add(member);
+            checkAndNotify();
+        }
+    }
+
+    private void deleteAndNotify(String member) {
+        LOG.debug("Deleting member to the list: {}", member);
+        if (member != null) {
+            this.deleted.add(member);
+            checkAndNotify();
+        }
+    }
+
+}

Reply via email to