This is an automated email from the ASF dual-hosted git repository.
nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 8cc227b Rebalancing cluster service
8cc227b is described below
commit 8cc227b65b0e5808cb4db7ea30ffcb2cb4e9cdfe
Author: nicolaferraro <[email protected]>
AuthorDate: Mon Nov 23 17:32:23 2020 +0100
Rebalancing cluster service
---
components/camel-kubernetes/pom.xml | 5 +
.../cluster/KubernetesClusterService.java | 54 +++-
.../kubernetes/cluster/KubernetesClusterView.java | 18 +-
.../kubernetes/cluster/LeaseResourceType.java | 29 +++
.../cluster/lock/ConfigMapLockUtils.java | 101 --------
.../lock/KubernetesLeadershipController.java | 229 +++++++++++++----
.../lock/KubernetesLeaseResourceManager.java | 79 ++++++
.../cluster/lock/KubernetesLockConfiguration.java | 61 ++++-
.../kubernetes/cluster/lock/LeaderInfo.java | 14 +-
.../cluster/lock/TimedLeaderNotifier.java | 16 +-
.../lock/impl/ConfigMapLeaseResourceManager.java | 144 +++++++++++
.../lock/impl/NativeLeaseResourceManager.java | 164 +++++++++++++
.../cluster/KubernetesClusterServiceTest.java | 273 ++++++++++++++++-----
.../cluster/utils/ConfigMapLockSimulator.java | 68 ++---
.../cluster/utils/LeaseLockSimulator.java | 56 +++++
.../kubernetes/cluster/utils/LockTestServer.java | 200 +++++++++------
...ckSimulator.java => ResourceLockSimulator.java} | 50 ++--
.../cluster/CamelPreemptiveClusterService.java | 28 +++
.../camel/cluster/CamelPreemptiveClusterView.java | 35 +++
.../cluster/RebalancingCamelClusterService.java | 267 ++++++++++++++++++++
20 files changed, 1506 insertions(+), 385 deletions(-)
diff --git a/components/camel-kubernetes/pom.xml
b/components/camel-kubernetes/pom.xml
index 6da12a5..01e48c5 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -111,6 +111,11 @@
<version>${commons-codec-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
index 9b24b7e..d9e1ace 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterService.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.cluster.CamelPreemptiveClusterService;
import org.apache.camel.component.kubernetes.KubernetesConfiguration;
import
org.apache.camel.component.kubernetes.cluster.lock.KubernetesLockConfiguration;
import org.apache.camel.support.cluster.AbstractCamelClusterService;
@@ -29,11 +30,12 @@ import org.apache.camel.util.ObjectHelper;
/**
* A Kubernetes based cluster service leveraging Kubernetes optimistic locks
on resources (specifically ConfigMaps).
*/
-public class KubernetesClusterService extends
AbstractCamelClusterService<KubernetesClusterView> {
+public class KubernetesClusterService extends
AbstractCamelClusterService<KubernetesClusterView>
+ implements CamelPreemptiveClusterService {
- private KubernetesConfiguration configuration;
+ protected KubernetesConfiguration configuration;
- private KubernetesLockConfiguration lockConfiguration;
+ protected KubernetesLockConfiguration lockConfiguration;
public KubernetesClusterService() {
this.configuration = new KubernetesConfiguration();
@@ -58,6 +60,11 @@ public class KubernetesClusterService extends
AbstractCamelClusterService<Kubern
return new KubernetesClusterView(getCamelContext(), this, config,
lockConfig);
}
+ @Override
+ public KubernetesClusterView getView(String namespace) throws Exception {
+ return (KubernetesClusterView) super.getView(namespace);
+ }
+
protected KubernetesConfiguration setConfigDefaults(
KubernetesConfiguration configuration, KubernetesLockConfiguration
lockConfiguration) {
if (configuration.getConnectionTimeout() == null) {
@@ -106,8 +113,8 @@ public class KubernetesClusterService extends
AbstractCamelClusterService<Kubern
}
if (config.getLeaseDurationMillis() <=
config.getRenewDeadlineMillis()) {
throw new IllegalStateException(
- "leaseDurationMillis must be greater than
renewDeadlineMillis " + "(" + config.getLeaseDurationMillis()
- + " is not greater than "
+ "leaseDurationMillis must be greater than
renewDeadlineMillis ("
+ + config.getLeaseDurationMillis()
+ " is not greater than "
+ config.getRenewDeadlineMillis()
+ ")");
}
if (config.getRenewDeadlineMillis() <= config.getJitterFactor() *
config.getRetryPeriodMillis()) {
@@ -154,15 +161,47 @@ public class KubernetesClusterService extends
AbstractCamelClusterService<Kubern
this.lockConfiguration.setKubernetesResourcesNamespace(kubernetesNamespace);
}
+ /**
+ * @return the resource name
+ * @deprecated Use {@link #getKubernetesResourceName()}
+ */
+ @Deprecated
public String getConfigMapName() {
return this.lockConfiguration.getConfigMapName();
}
/**
* Set the name of the ConfigMap used to do optimistic locking (defaults
to 'leaders').
+ *
+ * @param kubernetesResourceName the resource name
+ * @deprecated Use {@link
#setKubernetesResourceName(String)}
+ */
+ @Deprecated
+ public void setConfigMapName(String kubernetesResourceName) {
+ this.lockConfiguration.setConfigMapName(kubernetesResourceName);
+ }
+
+ public LeaseResourceType getLeaseResourceType() {
+ return this.lockConfiguration.getLeaseResourceType();
+ }
+
+ /**
+ * Set the lease resource type used in Kubernetes (defaults to 'Lease',
from coordination.k8s.io).
*/
- public void setConfigMapName(String configMapName) {
- this.lockConfiguration.setConfigMapName(configMapName);
+ public void setLeaseResourceType(LeaseResourceType type) {
+ this.lockConfiguration.setLeaseResourceType(type);
+ }
+
+ public String getKubernetesResourceName() {
+ return this.lockConfiguration.getKubernetesResourceName();
+ }
+
+ /**
+ * Set the name of the lease resource used to do optimistic locking
(defaults to 'leaders'). Resource name is used
+ * as prefix when the underlying Kubernetes resource can mange a single
lock.
+ */
+ public void setKubernetesResourceName(String kubernetesResourceName) {
+
this.lockConfiguration.setKubernetesResourceName(kubernetesResourceName);
}
public String getPodName() {
@@ -235,4 +274,5 @@ public class KubernetesClusterService extends
AbstractCamelClusterService<Kubern
public void setRetryPeriodMillis(long retryPeriodMillis) {
lockConfiguration.setRetryPeriodMillis(retryPeriodMillis);
}
+
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
index a2e80db..f684110 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterView.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.camel.CamelContext;
import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelPreemptiveClusterView;
import org.apache.camel.component.kubernetes.KubernetesConfiguration;
import org.apache.camel.component.kubernetes.KubernetesHelper;
import
org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEvent;
@@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper;
* The cluster view on a specific Camel cluster namespace (not to be confused
with Kubernetes namespaces). Namespaces
* are represented as keys in a Kubernetes ConfigMap (values are the current
leader pods).
*/
-public class KubernetesClusterView extends AbstractCamelClusterView {
+public class KubernetesClusterView extends AbstractCamelClusterView implements
CamelPreemptiveClusterView {
private CamelContext camelContext;
@@ -60,6 +61,8 @@ public class KubernetesClusterView extends
AbstractCamelClusterView {
private KubernetesLeadershipController controller;
+ private boolean disabled;
+
public KubernetesClusterView(CamelContext camelContext,
KubernetesClusterService cluster,
KubernetesConfiguration configuration,
KubernetesLockConfiguration
lockConfiguration) {
@@ -69,6 +72,7 @@ public class KubernetesClusterView extends
AbstractCamelClusterView {
this.lockConfiguration = ObjectHelper.notNull(lockConfiguration,
"lockConfiguration");
this.localMember = new
KubernetesClusterMember(lockConfiguration.getPodName());
this.memberCache = new HashMap<>();
+ this.disabled = false;
}
@Override
@@ -86,6 +90,17 @@ public class KubernetesClusterView extends
AbstractCamelClusterView {
return currentMembers;
}
+ public boolean isDisabled() {
+ return disabled;
+ }
+
+ public void setDisabled(boolean disabled) {
+ this.disabled = disabled;
+ if (this.controller != null) {
+ this.controller.setDisabled(disabled);
+ }
+ }
+
@Override
protected void doStart() throws Exception {
if (controller == null) {
@@ -121,6 +136,7 @@ public class KubernetesClusterView extends
AbstractCamelClusterView {
}
});
+ this.controller.setDisabled(disabled);
controller.start();
}
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java
new file mode 100644
index 0000000..0561f76
--- /dev/null
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/LeaseResourceType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster;
+
+public enum LeaseResourceType {
+ /**
+ * A Kubernetes ConfigMap.
+ */
+ ConfigMap,
+
+ /**
+ * A Kubernetes Lease (coordination.k8s.io).
+ */
+ Lease
+}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java
deleted file mode 100644
index ee14a84..0000000
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/ConfigMapLockUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.cluster.lock;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Set;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for managing ConfigMaps that contain lock information.
- */
-public final class ConfigMapLockUtils {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ConfigMapLockUtils.class);
-
- private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
-
- private static final String LEADER_PREFIX = "leader.pod.";
-
- private static final String LOCAL_TIMESTAMP_PREFIX =
"leader.local.timestamp.";
-
- private ConfigMapLockUtils() {
- }
-
- public static ConfigMap createNewConfigMap(String configMapName,
LeaderInfo leaderInfo) {
- return new
ConfigMapBuilder().withNewMetadata().withName(configMapName).addToLabels("provider",
"camel")
- .addToLabels("kind", "locks").endMetadata()
- .addToData(LEADER_PREFIX + leaderInfo.getGroupName(),
leaderInfo.getLeader())
- .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(),
formatDate(leaderInfo.getLocalTimestamp()))
- .build();
- }
-
- public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap,
LeaderInfo leaderInfo) {
- return new ConfigMapBuilder(configMap).addToData(LEADER_PREFIX +
leaderInfo.getGroupName(), leaderInfo.getLeader())
- .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(),
formatDate(leaderInfo.getLocalTimestamp()))
- .build();
- }
-
- public static LeaderInfo getLeaderInfo(ConfigMap configMap, Set<String>
members, String group) {
- return new LeaderInfo(group, getLeader(configMap, group),
getLocalTimestamp(configMap, group), members);
- }
-
- private static String getLeader(ConfigMap configMap, String group) {
- return getConfigMapValue(configMap, LEADER_PREFIX + group);
- }
-
- private static String formatDate(Date date) {
- if (date == null) {
- return null;
- }
- try {
- return new SimpleDateFormat(DATE_TIME_FORMAT).format(date);
- } catch (Exception e) {
- LOG.warn("Unable to format date '{}' using format {}", date,
DATE_TIME_FORMAT, e);
- }
-
- return null;
- }
-
- private static Date getLocalTimestamp(ConfigMap configMap, String group) {
- String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX
+ group);
- if (timestamp == null) {
- return null;
- }
-
- try {
- return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp);
- } catch (Exception e) {
- LOG.warn("Unable to parse time string '{}' using format {}",
timestamp, DATE_TIME_FORMAT, e);
- }
-
- return null;
- }
-
- private static String getConfigMapValue(ConfigMap configMap, String key) {
- if (configMap == null || configMap.getData() == null) {
- return null;
- }
- return configMap.getData().get(key);
- }
-
-}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
index 8c6c017..69fa5e4 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeadershipController.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.camel.CamelContext;
@@ -45,16 +45,18 @@ public class KubernetesLeadershipController implements
Service {
private enum State {
NOT_LEADER,
BECOMING_LEADER,
- LEADER
+ LEADER,
+ LOSING_LEADERSHIP,
+ LEADERSHIP_LOST
}
- private CamelContext camelContext;
+ private final CamelContext camelContext;
- private KubernetesClient kubernetesClient;
+ private final KubernetesClient kubernetesClient;
- private KubernetesLockConfiguration lockConfiguration;
+ private final KubernetesLockConfiguration lockConfiguration;
- private KubernetesClusterEventHandler eventHandler;
+ private final KubernetesClusterEventHandler eventHandler;
private State currentState = State.NOT_LEADER;
@@ -62,10 +64,14 @@ public class KubernetesLeadershipController implements
Service {
private TimedLeaderNotifier leaderNotifier;
+ private final KubernetesLeaseResourceManager<HasMetadata> leaseManager;
+
private volatile LeaderInfo latestLeaderInfo;
- private volatile ConfigMap latestConfigMap;
+ private volatile HasMetadata latestLeaseResource;
private volatile Set<String> latestMembers;
+ private boolean disabled;
+
public KubernetesLeadershipController(CamelContext camelContext,
KubernetesClient kubernetesClient,
KubernetesLockConfiguration
lockConfiguration,
KubernetesClusterEventHandler
eventHandler) {
@@ -73,6 +79,8 @@ public class KubernetesLeadershipController implements
Service {
this.kubernetesClient = kubernetesClient;
this.lockConfiguration = lockConfiguration;
this.eventHandler = eventHandler;
+ this.disabled = false;
+ this.leaseManager =
KubernetesLeaseResourceManager.create(lockConfiguration.getLeaseResourceType());
}
@Override
@@ -102,6 +110,18 @@ public class KubernetesLeadershipController implements
Service {
leaderNotifier = null;
}
+ public boolean isDisabled() {
+ return disabled;
+ }
+
+ public void setDisabled(boolean disabled) {
+ boolean oldState = this.disabled;
+ this.disabled = disabled;
+ if (oldState != disabled && serializedExecutor != null) {
+ serializedExecutor.execute(this::refreshStatus);
+ }
+ }
+
private void refreshStatus() {
switch (currentState) {
case NOT_LEADER:
@@ -113,6 +133,12 @@ public class KubernetesLeadershipController implements
Service {
case LEADER:
refreshStatusLeader();
break;
+ case LOSING_LEADERSHIP:
+ refreshStatusLosingLeadership();
+ break;
+ case LEADERSHIP_LOST:
+ refreshStatusLeadershipLost();
+ break;
default:
throw new RuntimeException("Unsupported state " +
currentState);
}
@@ -132,7 +158,8 @@ public class KubernetesLeadershipController implements
Service {
if (this.latestLeaderInfo.hasEmptyLeader()) {
// There is no previous leader
- LOG.info("{} The cluster has no leaders. Trying to acquire the
leadership...", logPrefix());
+ LOG.info("{} The cluster has no leaders for group {}. Trying to
acquire the leadership...", logPrefix(),
+ this.lockConfiguration.getGroupName());
boolean acquired = tryAcquireLeadership();
if (acquired) {
LOG.info("{} Leadership acquired by current pod with immediate
effect", logPrefix());
@@ -191,7 +218,55 @@ public class KubernetesLeadershipController implements
Service {
this.serializedExecutor.execute(this::refreshStatus);
}
+ /**
+ * This pod is going to manually lose the leadership. It should shutdown
activities and wait a lease amount of time
+ * before giving up the lease.
+ */
+ private void refreshStatusLosingLeadership() {
+ // Wait always the same amount of time before giving up the leadership
+ long delay = this.lockConfiguration.getLeaseDurationMillis();
+ LOG.info("{} Current pod owns the leadership, but it will be lost in
{} seconds...", logPrefix(),
+ new BigDecimal(delay).divide(BigDecimal.valueOf(1000), 2,
BigDecimal.ROUND_HALF_UP));
+
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted", e);
+ }
+
+ LOG.info("{} Current pod is losing leadership now...", logPrefix());
+ this.currentState = State.LEADERSHIP_LOST;
+ this.serializedExecutor.execute(this::refreshStatus);
+ }
+
+ /**
+ * Functions are stopped, now lost leadership should be communicated by
freeing up the lease.
+ */
+ private void refreshStatusLeadershipLost() {
+ boolean pulled = lookupNewLeaderInfo();
+ if (!pulled) {
+ rescheduleAfterDelay();
+ return;
+ }
+
+ if (!this.yieldLeadership()) {
+ rescheduleAfterDelay();
+ return;
+ }
+
+ LOG.info("{} Current pod has lost leadership", logPrefix());
+ this.currentState = State.NOT_LEADER;
+ this.serializedExecutor.execute(this::refreshStatus);
+ }
+
private void refreshStatusLeader() {
+ if (this.disabled) {
+ LOG.debug("{} Leadership disabled, pod is going to lose
leadership", logPrefix());
+ this.currentState = State.LOSING_LEADERSHIP;
+ this.serializedExecutor.execute(this::refreshStatus);
+ return;
+ }
+
LOG.debug("{} Pod should be the leader, pulling new data from the
cluster", logPrefix());
long timeBeforePulling = System.currentTimeMillis();
boolean pulled = lookupNewLeaderInfo();
@@ -202,9 +277,15 @@ public class KubernetesLeadershipController implements
Service {
if
(this.latestLeaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
LOG.debug("{} Current Pod is still the leader", logPrefix());
+
this.leaderNotifier.refreshLeadership(Optional.of(this.lockConfiguration.getPodName()),
timeBeforePulling,
this.lockConfiguration.getRenewDeadlineMillis(),
this.latestLeaderInfo.getMembers());
+
+ HasMetadata newLease =
this.leaseManager.refreshLeaseRenewTime(kubernetesClient,
this.latestLeaseResource,
+ this.lockConfiguration.getRenewDeadlineSeconds());
+ updateLatestLeaderInfo(newLease, this.latestMembers);
+
rescheduleAfterDelay();
return;
} else {
@@ -228,13 +309,17 @@ public class KubernetesLeadershipController implements
Service {
private boolean lookupNewLeaderInfo() {
LOG.debug("{} Looking up leadership information...", logPrefix());
- ConfigMap configMap;
+ HasMetadata leaseResource;
try {
- configMap = pullConfigMap();
+ leaseResource = leaseManager.fetchLeaseResource(kubernetesClient,
+
this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient),
+ this.lockConfiguration.getKubernetesResourceName(),
+ this.lockConfiguration.getGroupName());
} catch (Throwable e) {
- LOG.warn(logPrefix() + " Unable to retrieve the current ConfigMap
" + this.lockConfiguration.getConfigMapName()
- + " from Kubernetes");
- LOG.debug(logPrefix() + " Exception thrown during ConfigMap
lookup", e);
+ LOG.warn(logPrefix() + " Unable to retrieve the current lease
resource "
+ + this.lockConfiguration.getKubernetesResourceName()
+ + " for group " + this.lockConfiguration.getGroupName() +
" from Kubernetes");
+ LOG.debug(logPrefix() + " Exception thrown during lease resource
lookup", e);
return false;
}
@@ -247,14 +332,62 @@ public class KubernetesLeadershipController implements
Service {
return false;
}
- updateLatestLeaderInfo(configMap, members);
+ updateLatestLeaderInfo(leaseResource, members);
return true;
}
+ private boolean yieldLeadership() {
+ LOG.debug("{} Trying to yield the leadership...", logPrefix());
+
+ HasMetadata leaseResource = this.latestLeaseResource;
+ Set<String> members = this.latestMembers;
+ LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
+
+ if (latestLeaderInfo == null || members == null) {
+ LOG.warn(logPrefix() + " Unexpected condition. Latest leader info
or list of members is empty.");
+ return false;
+ } else if (!members.contains(this.lockConfiguration.getPodName())) {
+ LOG.warn(logPrefix() + " The list of cluster members " +
latestLeaderInfo.getMembers()
+ + " does not contain the current Pod. Cannot yield the
leadership.");
+ return false;
+ }
+
+ if (leaseResource == null) {
+ // Already yielded
+ return true;
+ }
+
+ LOG.debug("{} Lock lease resource already present in the Kubernetes
namespace. Checking...", logPrefix());
+ LeaderInfo leaderInfo = leaseManager.decodeLeaderInfo(leaseResource,
members, this.lockConfiguration.getGroupName());
+ if (!leaderInfo.isValidLeader(this.lockConfiguration.getPodName())) {
+ // Already yielded
+ return true;
+ }
+
+ try {
+ HasMetadata updatedLeaseResource =
leaseManager.optimisticDeleteLeaderInfo(kubernetesClient, leaseResource,
+ this.lockConfiguration.getGroupName());
+
+ LOG.debug("{} Lease resource {} for group {} successfully
updated", logPrefix(),
+ this.lockConfiguration.getKubernetesResourceName(),
this.lockConfiguration.getGroupName());
+ updateLatestLeaderInfo(updatedLeaseResource, members);
+ return true;
+ } catch (Exception ex) {
+ LOG.warn(logPrefix() + " Unable to update the lock on the lease
resource to remove leadership information");
+ LOG.debug(logPrefix() + " Error received during resource lock
replace", ex);
+ return false;
+ }
+ }
+
private boolean tryAcquireLeadership() {
+ if (this.disabled) {
+ LOG.debug("{} Won't try to acquire the leadership because it's
disabled...", logPrefix());
+ return false;
+ }
+
LOG.debug("{} Trying to acquire the leadership...", logPrefix());
- ConfigMap configMap = this.latestConfigMap;
+ HasMetadata leaseResource = this.latestLeaseResource;
Set<String> members = this.latestMembers;
LeaderInfo latestLeaderInfo = this.latestLeaderInfo;
@@ -267,53 +400,53 @@ public class KubernetesLeadershipController implements
Service {
return false;
}
- // Info we would set set in the configmap to become leaders
+ // Info we would set set in the lease resource to become leaders
LeaderInfo newLeaderInfo = new LeaderInfo(
- this.lockConfiguration.getGroupName(),
this.lockConfiguration.getPodName(), new Date(), members);
+ this.lockConfiguration.getGroupName(),
this.lockConfiguration.getPodName(), new Date(), members,
+ this.lockConfiguration.getLeaseDurationSeconds());
- if (configMap == null) {
- // No ConfigMap created so far
- LOG.debug("{} Lock configmap is not present in the Kubernetes
namespace. A new ConfigMap will be created",
+ if (leaseResource == null) {
+ // No leaseResource created so far
+ LOG.debug("{} Lock lease resource is not present in the Kubernetes
namespace. A new lease resource will be created",
logPrefix());
- ConfigMap newConfigMap
- =
ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(),
newLeaderInfo);
try {
- kubernetesClient.configMaps()
-
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .create(newConfigMap);
-
- LOG.debug("{} ConfigMap {} successfully created", logPrefix(),
this.lockConfiguration.getConfigMapName());
- updateLatestLeaderInfo(newConfigMap, members);
+ HasMetadata newLeaseResource =
leaseManager.createNewLeaseResource(kubernetesClient,
+
this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient),
+ this.lockConfiguration.getKubernetesResourceName(),
+ newLeaderInfo);
+
+ LOG.debug("{} Lease resource {} successfully created for group
{}", logPrefix(),
+ this.lockConfiguration.getKubernetesResourceName(),
newLeaderInfo.getGroupName());
+ updateLatestLeaderInfo(newLeaseResource, members);
return true;
} catch (Exception ex) {
// Suppress exception
LOG.warn(logPrefix()
- + " Unable to create the ConfigMap, it may have been
created by other cluster members concurrently. If the problem persists, check
if the service account has "
+ + " Unable to create the lease resource, it may have
been created by other cluster members concurrently. If the problem persists,
check if the service account has "
+ "the right " + "permissions to create it");
- LOG.debug(logPrefix() + " Exception while trying to create the
ConfigMap", ex);
+ LOG.debug(logPrefix() + " Exception while trying to create the
lease resource", ex);
return false;
}
} else {
- LOG.debug("{} Lock configmap already present in the Kubernetes
namespace. Checking...", logPrefix());
- LeaderInfo leaderInfo =
ConfigMapLockUtils.getLeaderInfo(configMap, members,
this.lockConfiguration.getGroupName());
+ LOG.debug("{} Lock lease resource already present in the
Kubernetes namespace. Checking...", logPrefix());
+ LeaderInfo leaderInfo
+ = leaseManager.decodeLeaderInfo(leaseResource, members,
this.lockConfiguration.getGroupName());
boolean canAcquire = !leaderInfo.hasValidLeader();
if (canAcquire) {
// Try to be the new leader
try {
- ConfigMap updatedConfigMap =
ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
- kubernetesClient.configMaps()
-
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-
.withName(this.lockConfiguration.getConfigMapName())
-
.lockResourceVersion(configMap.getMetadata().getResourceVersion()).replace(updatedConfigMap);
-
- LOG.debug("{} ConfigMap {} successfully updated",
logPrefix(), this.lockConfiguration.getConfigMapName());
- updateLatestLeaderInfo(updatedConfigMap, members);
+ HasMetadata updatedLeaseResource
+ =
leaseManager.optimisticAcquireLeadership(kubernetesClient, leaseResource,
newLeaderInfo);
+
+ LOG.debug("{} Lease resource {} successfully updated for
group {}", logPrefix(),
+
this.lockConfiguration.getKubernetesResourceName(),
newLeaderInfo.getGroupName());
+ updateLatestLeaderInfo(updatedLeaseResource, members);
return true;
} catch (Exception ex) {
- LOG.warn(logPrefix() + " Unable to update the lock
ConfigMap to set leadership information");
- LOG.debug(logPrefix() + " Error received during configmap
lock replace", ex);
+ LOG.warn(logPrefix() + " Unable to update the lock lease
resource to set leadership information");
+ LOG.debug(logPrefix() + " Error received during lease
resource lock replace", ex);
return false;
}
} else {
@@ -325,20 +458,14 @@ public class KubernetesLeadershipController implements
Service {
}
}
- private void updateLatestLeaderInfo(ConfigMap configMap, Set<String>
members) {
+ private void updateLatestLeaderInfo(HasMetadata leaseResource, Set<String>
members) {
LOG.debug("{} Updating internal status about the current leader",
logPrefix());
- this.latestConfigMap = configMap;
+ this.latestLeaseResource = leaseResource;
this.latestMembers = members;
- this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap,
members, this.lockConfiguration.getGroupName());
+ this.latestLeaderInfo = leaseManager.decodeLeaderInfo(leaseResource,
members, this.lockConfiguration.getGroupName());
LOG.debug("{} Current leader info: {}", logPrefix(),
this.latestLeaderInfo);
}
- private ConfigMap pullConfigMap() {
- return kubernetesClient.configMaps()
-
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .withName(this.lockConfiguration.getConfigMapName()).get();
- }
-
private Set<String> pullClusterMembers() {
List<Pod> pods = kubernetesClient.pods()
.inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java
new file mode 100644
index 0000000..0ab7b7f
--- /dev/null
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLeaseResourceManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.lock;
+
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kubernetes.cluster.LeaseResourceType;
+import
org.apache.camel.component.kubernetes.cluster.lock.impl.ConfigMapLeaseResourceManager;
+import
org.apache.camel.component.kubernetes.cluster.lock.impl.NativeLeaseResourceManager;
+
+/**
+ * Handles the actual interaction with Kubernetes resources, allowing
different implementation to be plugged.
+ */
+public interface KubernetesLeaseResourceManager<T extends HasMetadata> {
+
+ /**
+ * Create a new {@link KubernetesLeaseResourceManager} of the given {@link
LeaseResourceType}.
+ */
+ @SuppressWarnings("unchecked")
+ static <S extends HasMetadata> KubernetesLeaseResourceManager<S>
create(LeaseResourceType type) {
+ switch (type) {
+ case ConfigMap:
+ return (KubernetesLeaseResourceManager<S>) new
ConfigMapLeaseResourceManager();
+ case Lease:
+ return (KubernetesLeaseResourceManager<S>) new
NativeLeaseResourceManager();
+ default:
+ throw new RuntimeCamelException("Unsupported lease resource
type " + type);
+ }
+ }
+
+ /**
+ * Return a {@link LeaderInfo} object from the underlying Kubernetes
resource.
+ */
+ LeaderInfo decodeLeaderInfo(T leaseResource, Set<String> members, String
group);
+
+ /**
+ * Fetch the lease resource for the given name and group.
+ */
+ T fetchLeaseResource(KubernetesClient client, String namespace, String
leaseResourceName, String group);
+
+ /**
+ * Delete leadership information for the given lease resource and group.
+ */
+ T optimisticDeleteLeaderInfo(KubernetesClient client, T leaseResource,
String group);
+
+ /**
+ * Set the leadership information on the lease resource to match the given
{@link LeaderInfo}.
+ */
+ T optimisticAcquireLeadership(KubernetesClient client, T leaseResource,
LeaderInfo newLeaderInfo);
+
+ /**
+ * Create a new lease resource matching the given {@link LeaderInfo}.
+ */
+ T createNewLeaseResource(KubernetesClient client, String namespace, String
leaseResourceName, LeaderInfo leaderInfo);
+
+ /**
+ * Update information on the lease resource to increase the renew time (if
last renewal has occurred more than
+ * minUpdateIntervalSeconds seconds ago).
+ */
+ T refreshLeaseRenewTime(KubernetesClient client, T leaseResource, int
minUpdateIntervalSeconds);
+
+}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
index de9999b..70c52f8 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/KubernetesLockConfiguration.java
@@ -20,18 +20,25 @@ import java.util.HashMap;
import java.util.Map;
import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.component.kubernetes.cluster.LeaseResourceType;
/**
* Configuration for Kubernetes Lock.
*/
public class KubernetesLockConfiguration implements Cloneable {
- public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
+ public static final LeaseResourceType DEFAULT_LEASE_RESOURCE_TYPE =
LeaseResourceType.Lease;
+ public static final String DEFAULT_RESOURCE_NAME = "leaders";
public static final double DEFAULT_JITTER_FACTOR = 1.2;
- public static final long DEFAULT_LEASE_DURATION_MILLIS = 30000;
- public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 20000;
- public static final long DEFAULT_RETRY_PERIOD_MILLIS = 5000;
+ public static final long DEFAULT_LEASE_DURATION_MILLIS = 15000;
+ public static final long DEFAULT_RENEW_DEADLINE_MILLIS = 10000;
+ public static final long DEFAULT_RETRY_PERIOD_MILLIS = 2000;
+
+ /**
+ * Kubernetes resource type used to hold the leases.
+ */
+ private LeaseResourceType leaseResourceType = DEFAULT_LEASE_RESOURCE_TYPE;
/**
* Kubernetes namespace containing the pods and the ConfigMap used for
locking.
@@ -39,9 +46,9 @@ public class KubernetesLockConfiguration implements Cloneable
{
private String kubernetesResourcesNamespace;
/**
- * Name of the ConfigMap used for locking.
+ * Name of the resource used for locking (or prefix, in case multiple ones
are used).
*/
- private String configMapName = DEFAULT_CONFIGMAP_NAME;
+ private String kubernetesResourceName = DEFAULT_RESOURCE_NAME;
/**
* Name of the lock group (or namespace according to the Camel cluster
convention) within the chosen ConfigMap.
@@ -82,6 +89,14 @@ public class KubernetesLockConfiguration implements
Cloneable {
public KubernetesLockConfiguration() {
}
+ public LeaseResourceType getLeaseResourceType() {
+ return leaseResourceType;
+ }
+
+ public void setLeaseResourceType(LeaseResourceType leaseResourceType) {
+ this.leaseResourceType = leaseResourceType;
+ }
+
public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient
kubernetesClient) {
if (kubernetesResourcesNamespace != null) {
return kubernetesResourcesNamespace;
@@ -97,12 +112,30 @@ public class KubernetesLockConfiguration implements
Cloneable {
this.kubernetesResourcesNamespace = kubernetesResourcesNamespace;
}
+ /**
+ * @return the resource name
+ * @deprecated Use {@link #getKubernetesResourceName()}
+ */
+ @Deprecated
public String getConfigMapName() {
- return configMapName;
+ return kubernetesResourceName;
}
- public void setConfigMapName(String configMapName) {
- this.configMapName = configMapName;
+ /**
+ * @param kubernetesResourceName the resource name
+ * @deprecated Use {@link
#setKubernetesResourceName(String)}
+ */
+ @Deprecated
+ public void setConfigMapName(String kubernetesResourceName) {
+ this.kubernetesResourceName = kubernetesResourceName;
+ }
+
+ public String getKubernetesResourceName() {
+ return kubernetesResourceName;
+ }
+
+ public void setKubernetesResourceName(String kubernetesResourceName) {
+ this.kubernetesResourceName = kubernetesResourceName;
}
public String getGroupName() {
@@ -141,6 +174,10 @@ public class KubernetesLockConfiguration implements
Cloneable {
this.jitterFactor = jitterFactor;
}
+ public int getLeaseDurationSeconds() {
+ return (int) (getLeaseDurationMillis() / 1000);
+ }
+
public long getLeaseDurationMillis() {
return leaseDurationMillis;
}
@@ -149,6 +186,10 @@ public class KubernetesLockConfiguration implements
Cloneable {
this.leaseDurationMillis = leaseDurationMillis;
}
+ public int getRenewDeadlineSeconds() {
+ return (int) (getRenewDeadlineMillis() / 1000);
+ }
+
public long getRenewDeadlineMillis() {
return renewDeadlineMillis;
}
@@ -178,7 +219,7 @@ public class KubernetesLockConfiguration implements
Cloneable {
public String toString() {
final StringBuilder sb = new
StringBuilder("KubernetesLockConfiguration{");
sb.append("kubernetesResourcesNamespace='").append(kubernetesResourcesNamespace).append('\'');
- sb.append(", configMapName='").append(configMapName).append('\'');
+ sb.append(",
kubernetesResourceName='").append(kubernetesResourceName).append('\'');
sb.append(", groupName='").append(groupName).append('\'');
sb.append(", podName='").append(podName).append('\'');
sb.append(", clusterLabels=").append(clusterLabels);
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
index f19a075..97b2b47 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/LeaderInfo.java
@@ -34,14 +34,17 @@ public class LeaderInfo {
private Set<String> members;
+ private Integer leaseDurationSeconds;
+
public LeaderInfo() {
}
- public LeaderInfo(String groupName, String leader, Date timestamp,
Set<String> members) {
+ public LeaderInfo(String groupName, String leader, Date timestamp,
Set<String> members, Integer leaseDurationSeconds) {
this.groupName = groupName;
this.leader = leader;
this.localTimestamp = timestamp;
this.members = members;
+ this.leaseDurationSeconds = leaseDurationSeconds;
}
public boolean hasEmptyLeader() {
@@ -89,6 +92,14 @@ public class LeaderInfo {
this.members = members;
}
+ public Integer getLeaseDurationSeconds() {
+ return leaseDurationSeconds;
+ }
+
+ public void setLeaseDurationSeconds(Integer leaseDurationSeconds) {
+ this.leaseDurationSeconds = leaseDurationSeconds;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("LeaderInfo{");
@@ -96,6 +107,7 @@ public class LeaderInfo {
sb.append(", leader='").append(leader).append('\'');
sb.append(", localTimestamp=").append(localTimestamp);
sb.append(", members=").append(members);
+ sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds);
sb.append('}');
return sb.toString();
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
index 5b49fef..650d592 100644
---
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/TimedLeaderNotifier.java
@@ -165,12 +165,8 @@ public class TimedLeaderNotifier implements Service {
lastCommunicatedLeader = newLeader;
LOG.info("The cluster has a new leader: {}", newLeader);
try {
- handler.onKubernetesClusterEvent(new
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent() {
- @Override
- public Optional<String> getData() {
- return newLeader;
- }
- });
+ handler.onKubernetesClusterEvent(
+
(KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader);
} catch (Throwable t) {
LOG.warn("Error while communicating the new leader to the
handler", t);
}
@@ -181,12 +177,8 @@ public class TimedLeaderNotifier implements Service {
lastCommunicatedMembers = newMembers;
LOG.info("The list of cluster members has changed: {}",
newMembers);
try {
- handler.onKubernetesClusterEvent(new
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent() {
- @Override
- public Set<String> getData() {
- return newMembers;
- }
- });
+ handler.onKubernetesClusterEvent(
+
(KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) () ->
newMembers);
} catch (Throwable t) {
LOG.warn("Error while communicating the cluster members to the
handler", t);
}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java
new file mode 100644
index 0000000..66e6b2e
--- /dev/null
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/ConfigMapLeaseResourceManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.lock.impl;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import
org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager;
+import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigMapLeaseResourceManager implements
KubernetesLeaseResourceManager<ConfigMap> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigMapLeaseResourceManager.class);
+
+ private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
+
+ private static final String LEADER_PREFIX = "leader.pod.";
+
+ private static final String LOCAL_TIMESTAMP_PREFIX =
"leader.local.timestamp.";
+
+ @Override
+ public LeaderInfo decodeLeaderInfo(ConfigMap configMap, Set<String>
members, String group) {
+ return new LeaderInfo(group, getLeader(configMap, group),
getLocalTimestamp(configMap, group), members, null);
+ }
+
+ @Override
+ public ConfigMap fetchLeaseResource(KubernetesClient client, String
namespace, String name, String group) {
+ return client.configMaps()
+ .inNamespace(namespace)
+ .withName(name).get();
+ }
+
+ @Override
+ public ConfigMap optimisticDeleteLeaderInfo(KubernetesClient client,
ConfigMap leaseResource, String group) {
+ ConfigMap updatedConfigMap = getConfigMapWithoutLeader(leaseResource,
group);
+ return client.configMaps()
+ .inNamespace(leaseResource.getMetadata().getNamespace())
+ .withName(leaseResource.getMetadata().getName())
+
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedConfigMap);
+ }
+
+ @Override
+ public ConfigMap optimisticAcquireLeadership(KubernetesClient client,
ConfigMap leaseResource, LeaderInfo newLeaderInfo) {
+ ConfigMap updatedConfigMap = getConfigMapWithNewLeader(leaseResource,
newLeaderInfo);
+ return client.configMaps()
+ .inNamespace(leaseResource.getMetadata().getNamespace())
+ .withName(leaseResource.getMetadata().getName())
+
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedConfigMap);
+ }
+
+ @Override
+ public ConfigMap createNewLeaseResource(
+ KubernetesClient client, String namespace, String
leaseResourceName, LeaderInfo leaderInfo) {
+ ConfigMap newConfigMap
+ = new
ConfigMapBuilder().withNewMetadata().withName(leaseResourceName).addToLabels("provider",
"camel")
+ .addToLabels("kind", "locks").endMetadata()
+ .addToData(LEADER_PREFIX + leaderInfo.getGroupName(),
leaderInfo.getLeader())
+ .addToData(LOCAL_TIMESTAMP_PREFIX +
leaderInfo.getGroupName(),
+ formatDate(leaderInfo.getLocalTimestamp()))
+ .build();
+
+ return client.configMaps()
+ .inNamespace(namespace)
+ .create(newConfigMap);
+ }
+
+ @Override
+ public ConfigMap refreshLeaseRenewTime(KubernetesClient client, ConfigMap
leaseResource, int minUpdateIntervalSeconds) {
+ // Configmap does not store renew information
+ return leaseResource;
+ }
+
+ private static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap,
LeaderInfo leaderInfo) {
+ return new ConfigMapBuilder(configMap).addToData(LEADER_PREFIX +
leaderInfo.getGroupName(), leaderInfo.getLeader())
+ .addToData(LOCAL_TIMESTAMP_PREFIX + leaderInfo.getGroupName(),
formatDate(leaderInfo.getLocalTimestamp()))
+ .build();
+ }
+
+ private static ConfigMap getConfigMapWithoutLeader(ConfigMap configMap,
String group) {
+ return new ConfigMapBuilder(configMap).removeFromData(LEADER_PREFIX +
group)
+ .removeFromData(LOCAL_TIMESTAMP_PREFIX + group)
+ .build();
+ }
+
+ private static Date getLocalTimestamp(ConfigMap configMap, String group) {
+ String timestamp = getConfigMapValue(configMap, LOCAL_TIMESTAMP_PREFIX
+ group);
+ if (timestamp == null) {
+ return null;
+ }
+
+ try {
+ return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp);
+ } catch (Exception e) {
+ LOG.warn("Unable to parse time string '" + timestamp + "' using
format " + DATE_TIME_FORMAT, e);
+ }
+
+ return null;
+ }
+
+ private static String getLeader(ConfigMap configMap, String group) {
+ return getConfigMapValue(configMap, LEADER_PREFIX + group);
+ }
+
+ private static String getConfigMapValue(ConfigMap configMap, String key) {
+ if (configMap == null || configMap.getData() == null) {
+ return null;
+ }
+ return configMap.getData().get(key);
+ }
+
+ private static String formatDate(Date date) {
+ if (date == null) {
+ return null;
+ }
+ try {
+ return new SimpleDateFormat(DATE_TIME_FORMAT).format(date);
+ } catch (Exception e) {
+ LOG.warn("Unable to format date '" + date + "' using format " +
DATE_TIME_FORMAT, e);
+ }
+
+ return null;
+ }
+
+}
diff --git
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java
new file mode 100644
index 0000000..1400208
--- /dev/null
+++
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/cluster/lock/impl/NativeLeaseResourceManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.lock.impl;
+
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Set;
+
+import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
+import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import
org.apache.camel.component.kubernetes.cluster.lock.KubernetesLeaseResourceManager;
+import org.apache.camel.component.kubernetes.cluster.lock.LeaderInfo;
+
+public class NativeLeaseResourceManager implements
KubernetesLeaseResourceManager<Lease> {
+
+ @Override
+ public LeaderInfo decodeLeaderInfo(Lease lease, Set<String> members,
String group) {
+ return new LeaderInfo(group, getLeader(lease),
getLocalTimestamp(lease), members, getLeaseDurationSeconds(lease));
+ }
+
+ @Override
+ public Lease fetchLeaseResource(KubernetesClient client, String namespace,
String name, String group) {
+ return client.leases()
+ .inNamespace(namespace)
+ .withName(leaseResourceName(name, group)).get();
+ }
+
+ @Override
+ public Lease optimisticDeleteLeaderInfo(KubernetesClient client, Lease
leaseResource, String group) {
+ Lease updatedLease = getLeaseWithoutLeader(leaseResource);
+ return client.leases()
+ .inNamespace(leaseResource.getMetadata().getNamespace())
+ .withName(leaseResource.getMetadata().getName())
+
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease);
+ }
+
+ @Override
+ public Lease optimisticAcquireLeadership(KubernetesClient client, Lease
leaseResource, LeaderInfo newLeaderInfo) {
+ Lease updatedLease = getLeaseWithNewLeader(leaseResource,
newLeaderInfo);
+ return client.leases()
+ .inNamespace(leaseResource.getMetadata().getNamespace())
+ .withName(leaseResource.getMetadata().getName())
+
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease);
+ }
+
+ @Override
+ public Lease refreshLeaseRenewTime(KubernetesClient client, Lease
leaseResource, int minUpdateIntervalSeconds) {
+ ZonedDateTime lastRenew = leaseResource.getSpec() != null ?
leaseResource.getSpec().getRenewTime() : null;
+ if (lastRenew == null ||
lastRenew.plusSeconds(minUpdateIntervalSeconds).isBefore(ZonedDateTime.now())) {
+ Lease updatedLease = new LeaseBuilder(leaseResource)
+ .editOrNewSpec()
+ .withRenewTime(ZonedDateTime.now())
+ .endSpec()
+ .build();
+ return client.leases()
+ .inNamespace(leaseResource.getMetadata().getNamespace())
+ .withName(leaseResource.getMetadata().getName())
+
.lockResourceVersion(leaseResource.getMetadata().getResourceVersion()).replace(updatedLease);
+ }
+ return leaseResource;
+ }
+
+ @Override
+ public Lease createNewLeaseResource(KubernetesClient client, String
namespace, String prefix, LeaderInfo leaderInfo) {
+ ZonedDateTime now = ZonedDateTime.now();
+ Lease newLease = new LeaseBuilder().withNewMetadata()
+ .withName(leaseResourceName(prefix, leaderInfo.getGroupName()))
+ .addToLabels("provider", "camel")
+ .endMetadata()
+ .withNewSpec()
+ .withNewHolderIdentity(leaderInfo.getLeader())
+ .withAcquireTime(now)
+ .withLeaseDurationSeconds(leaderInfo.getLeaseDurationSeconds())
+ .withRenewTime(now)
+ .endSpec()
+ .build();
+
+ return client.leases()
+ .inNamespace(namespace)
+ .create(newLease);
+ }
+
+ private static Lease getLeaseWithNewLeader(Lease lease, LeaderInfo
leaderInfo) {
+ Integer transitions = lease.getSpec() != null ?
lease.getSpec().getLeaseTransitions() : null;
+ if (transitions == null) {
+ transitions = 0;
+ }
+ ZonedDateTime now = ZonedDateTime.now();
+ return new LeaseBuilder(lease)
+ .editOrNewSpec()
+ .withNewHolderIdentity(leaderInfo.getLeader())
+ .withAcquireTime(now)
+ .withLeaseDurationSeconds(leaderInfo.getLeaseDurationSeconds())
+ .withRenewTime(now)
+ .withLeaseTransitions(transitions + 1)
+ .endSpec()
+ .build();
+ }
+
+ private static Lease getLeaseWithoutLeader(Lease lease) {
+ return new LeaseBuilder(lease).editOrNewSpec()
+ .withHolderIdentity(null)
+ .withAcquireTime(null)
+ .withRenewTime(null)
+ .withLeaseDurationSeconds(null)
+ .endSpec()
+ .build();
+ }
+
+ private static Date getLocalTimestamp(Lease lease) {
+ if (lease == null || lease.getSpec() == null ||
lease.getSpec().getAcquireTime() == null) {
+ return null;
+ }
+ return Date.from(lease.getSpec().getAcquireTime().toInstant());
+ }
+
+ private static Integer getLeaseDurationSeconds(Lease lease) {
+ if (lease == null || lease.getSpec() == null) {
+ return null;
+ }
+ return lease.getSpec().getLeaseDurationSeconds();
+ }
+
+ private static String getLeader(Lease lease) {
+ if (lease == null || lease.getSpec() == null) {
+ return null;
+ }
+ return lease.getSpec().getHolderIdentity();
+ }
+
+ private static String leaseResourceName(String prefix, String group) {
+ return toValidKubernetesID(prefix + "-" + group);
+ }
+
+ private static String toValidKubernetesID(String id) {
+ id = id.toLowerCase().replaceAll("[^a-z0-9-.]", "-");
+ while (id.length() > 0 && isNonAlphanumeric(id, 0)) {
+ id = id.substring(1);
+ }
+ while (id.length() > 0 && isNonAlphanumeric(id, id.length() - 1)) {
+ id = id.substring(0, id.length() - 1);
+ }
+ return id;
+ }
+
+ private static boolean isNonAlphanumeric(String id, int pos) {
+ return !Character.isAlphabetic(id.charAt(pos)) &&
!Character.isDigit(id.charAt(pos));
+ }
+}
diff --git
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
index e58d4d1..cdaedc9 100644
---
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
+++
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
@@ -16,25 +16,40 @@
*/
package org.apache.camel.component.kubernetes.cluster;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;
+import org.apache.camel.cluster.CamelPreemptiveClusterService;
import org.apache.camel.component.kubernetes.KubernetesConfiguration;
import
org.apache.camel.component.kubernetes.cluster.utils.ConfigMapLockSimulator;
import org.apache.camel.component.kubernetes.cluster.utils.LeaderRecorder;
+import org.apache.camel.component.kubernetes.cluster.utils.LeaseLockSimulator;
import org.apache.camel.component.kubernetes.cluster.utils.LockTestServer;
+import
org.apache.camel.component.kubernetes.cluster.utils.ResourceLockSimulator;
+import org.apache.camel.support.cluster.RebalancingCamelClusterService;
import org.apache.camel.test.junit5.CamelTestSupport;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -52,31 +67,33 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
private static final int RETRY_PERIOD_MILLIS = 200;
private static final double JITTER_FACTOR = 1.1;
- private ConfigMapLockSimulator lockSimulator;
+ private ConfigMapLockSimulator configMapLockSimulator;
+ private Map<String, LeaseLockSimulator> leaseLockSimulators = new
HashMap<>();
- private Map<String, LockTestServer> lockServers;
+ private Map<String, LockTestServer<?>> lockServers = new HashMap<>();
- @BeforeEach
- public void prepareLock() {
- this.lockSimulator = new ConfigMapLockSimulator("leaders");
- this.lockServers = new HashMap<>();
- }
+ private Map<String, CamelPreemptiveClusterService> clusterServices = new
HashMap<>();
@AfterEach
public void shutdownLock() {
- for (LockTestServer server : this.lockServers.values()) {
+ for (LockTestServer<?> server : this.lockServers.values()) {
try {
server.destroy();
} catch (Exception e) {
// can happen in case of delay
}
}
+ this.lockServers = new HashMap<>();
+ configMapLockSimulator = null;
+ leaseLockSimulators = new HashMap<>();
+ clusterServices = new HashMap<>();
}
- @Test
- public void testSimpleLeaderElection() throws Exception {
- LeaderRecorder mypod1 = addMember("mypod1");
- LeaderRecorder mypod2 = addMember("mypod2");
+ @ParameterizedTest
+ @EnumSource(LeaseResourceType.class)
+ public void testSimpleLeaderElection(LeaseResourceType type) throws
Exception {
+ LeaderRecorder mypod1 = addMember("mypod1", type);
+ LeaderRecorder mypod2 = addMember("mypod2", type);
context.start();
mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -88,11 +105,12 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
assertEquals(mypod2.getCurrentLeader(), leader, "Leaders should be
equals");
}
- @Test
- public void testMultipleMembersLeaderElection() throws Exception {
+ @ParameterizedTest
+ @EnumSource(LeaseResourceType.class)
+ public void testMultipleMembersLeaderElection(LeaseResourceType type)
throws Exception {
int number = 5;
List<LeaderRecorder> members
- = IntStream.range(0, number).mapToObj(i -> addMember("mypod" +
i)).collect(Collectors.toList());
+ = IntStream.range(0, number).mapToObj(i -> addMember("mypod" +
i, type)).collect(Collectors.toList());
context.start();
for (LeaderRecorder member : members) {
@@ -107,10 +125,11 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
@Test
public void testSimpleLeaderElectionWithExistingConfigMap() throws
Exception {
- lockSimulator.setConfigMap(new
ConfigMapBuilder().withNewMetadata().withName("leaders").and().build(), true);
+ this.configMapLockSimulator = new ConfigMapLockSimulator("leaders");
+ configMapLockSimulator.setResource(new
ConfigMapBuilder().withNewMetadata().withName("leaders").and().build(), true);
- LeaderRecorder mypod1 = addMember("mypod1");
- LeaderRecorder mypod2 = addMember("mypod2");
+ LeaderRecorder mypod1 = addMember("mypod1",
LeaseResourceType.ConfigMap);
+ LeaderRecorder mypod2 = addMember("mypod2",
LeaseResourceType.ConfigMap);
context.start();
mypod1.waitForAnyLeader(10, TimeUnit.SECONDS);
@@ -122,9 +141,31 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
}
@Test
- public void testLeadershipLoss() throws Exception {
- LeaderRecorder mypod1 = addMember("mypod1");
- LeaderRecorder mypod2 = addMember("mypod2");
+ public void testSimpleLeaderElectionWithExistingLeases() throws Exception {
+ LeaseLockSimulator simulator = new
LeaseLockSimulator("leaders-mygroup");
+ simulator.setResource(new LeaseBuilder()
+ .withNewMetadata().withName("leaders-mygroup")
+ .and()
+ .build(), true);
+ this.leaseLockSimulators.put("mygroup", simulator);
+
+ LeaderRecorder mypod1 = addMember("mypod1", "mygroup",
LeaseResourceType.Lease);
+ LeaderRecorder mypod2 = addMember("mypod2", "mygroup",
LeaseResourceType.Lease);
+ context.start();
+
+ mypod1.waitForAnyLeader(10, TimeUnit.SECONDS);
+ mypod2.waitForAnyLeader(10, TimeUnit.SECONDS);
+
+ String leader = mypod1.getCurrentLeader();
+ assertTrue(leader.startsWith("mypod"));
+ assertEquals(mypod2.getCurrentLeader(), leader, "Leaders should be
equals");
+ }
+
+ @ParameterizedTest
+ @EnumSource(LeaseResourceType.class)
+ public void testLeadershipLoss(LeaseResourceType type) throws Exception {
+ LeaderRecorder mypod1 = addMember("mypod1", type);
+ LeaderRecorder mypod2 = addMember("mypod2", type);
context.start();
mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -152,10 +193,11 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
checkLeadershipChangeDistance((LEASE_TIME_MILLIS -
RENEW_DEADLINE_MILLIS) / 2, TimeUnit.MILLISECONDS, mypod1, mypod2);
}
- @Test
- public void testSlowLeaderLosingLeadershipOnlyInternally() throws
Exception {
- LeaderRecorder mypod1 = addMember("mypod1");
- LeaderRecorder mypod2 = addMember("mypod2");
+ @ParameterizedTest
+ @EnumSource(LeaseResourceType.class)
+ public void testSlowLeaderLosingLeadershipOnlyInternally(LeaseResourceType
type) throws Exception {
+ LeaderRecorder mypod1 = addMember("mypod1", type);
+ LeaderRecorder mypod2 = addMember("mypod2", type);
context.start();
mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -173,10 +215,11 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
assertEquals(firstLeader, formerLoserRecorder.getCurrentLeader());
}
- @Test
- public void testRecoveryAfterFailure() throws Exception {
- LeaderRecorder mypod1 = addMember("mypod1");
- LeaderRecorder mypod2 = addMember("mypod2");
+ @ParameterizedTest
+ @EnumSource(LeaseResourceType.class)
+ public void testRecoveryAfterFailure(LeaseResourceType type) throws
Exception {
+ LeaderRecorder mypod1 = addMember("mypod1", type);
+ LeaderRecorder mypod2 = addMember("mypod2", type);
context.start();
mypod1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -197,10 +240,10 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
@Test
public void testSharedConfigMap() throws Exception {
- LeaderRecorder a1 = addMember("a1");
- LeaderRecorder a2 = addMember("a2");
- LeaderRecorder b1 = addMember("b1", "app2");
- LeaderRecorder b2 = addMember("b2", "app2");
+ LeaderRecorder a1 = addMember("a1", LeaseResourceType.ConfigMap);
+ LeaderRecorder a2 = addMember("a2", LeaseResourceType.ConfigMap);
+ LeaderRecorder b1 = addMember("b1", "app2",
LeaseResourceType.ConfigMap);
+ LeaderRecorder b2 = addMember("b2", "app2",
LeaseResourceType.ConfigMap);
context.start();
a1.waitForAnyLeader(5, TimeUnit.SECONDS);
@@ -218,33 +261,110 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
assertNotEquals(a1.getCurrentLeader(), b2.getCurrentLeader());
}
+ static Stream<Arguments> rebalancingProvider() {
+ return Stream.of(
+ // LeaseResourceType, pods, partitions, expected partitions
owned, tolerance on owned partitions
+ Arguments.of(LeaseResourceType.Lease, 4, 2, 0, 1),
+ Arguments.of(LeaseResourceType.Lease, 1, 2, 2, 0),
+ Arguments.of(LeaseResourceType.Lease, 2, 2, 1, 0),
+ Arguments.of(LeaseResourceType.ConfigMap, 3, 10, 3, 1),
+ Arguments.of(LeaseResourceType.Lease, 3, 10, 3, 1),
+ Arguments.of(LeaseResourceType.ConfigMap, 6, 23, 3, 1),
+ Arguments.of(LeaseResourceType.Lease, 6, 23, 3, 1));
+ }
+
+ @ParameterizedTest
+ @MethodSource("rebalancingProvider")
+ public void testRebalancing(LeaseResourceType type, int pods, int
partitions, int expectedPartitionsPerPod, int tolerance)
+ throws Exception {
+ Map<String, List<LeaderRecorder>> recorders = createCluster(type,
pods, partitions);
+ context.start();
+
+ waitForAllLeaders(recorders, leaders -> {
+ Map<String, Long> counts = leaders.values().stream()
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+
+ for (Long count : counts.values()) {
+ if (count < expectedPartitionsPerPod || count >
expectedPartitionsPerPod + tolerance) {
+ return false;
+ }
+ }
+ return true;
+ }, 30, TimeUnit.SECONDS);
+ }
+
+ private Map<String, List<LeaderRecorder>> createCluster(LeaseResourceType
type, int pods, int partitions) {
+ Map<String, List<LeaderRecorder>> recorders = new HashMap<>();
+ for (int i = 0; i < partitions; i++) {
+ String partitionName = "partition-" + i;
+ recorders.put(partitionName, new ArrayList<>());
+ for (int j = 0; j < pods; j++) {
+ recorders.get(partitionName).add(addMember("mypod-" + j,
partitionName, type, true));
+ }
+ }
+ return recorders;
+ }
+
+ private void waitForAllLeaders(
+ Map<String, List<LeaderRecorder>> partitionRecorders,
+ Predicate<Map<String, String>> condition, long time, TimeUnit
unit) {
+ Awaitility.waitAtMost(time, unit).until(() -> {
+ Map<String, String> leaders = new HashMap<>();
+ for (String partition : partitionRecorders.keySet()) {
+ String leader = null;
+ for (LeaderRecorder recorder :
partitionRecorders.get(partition)) {
+ String partitionLeader = recorder.getCurrentLeader();
+ if (partitionLeader == null || (leader != null &&
!leader.equals(partitionLeader))) {
+ return false;
+ }
+ leader = partitionLeader;
+ }
+ if (leader == null) {
+ return false;
+ }
+ leaders.put(partition, leader);
+ }
+ return condition.test(leaders);
+ });
+ }
+
+ private void withLockServer(String pod, Consumer<LockTestServer<?>>
consumer) {
+ consumer.accept(this.lockServers.get(pod));
+ }
+
private void delayRequestsFromPod(String pod, long delay, TimeUnit unit) {
-
this.lockServers.get(pod).setDelayRequests(TimeUnit.MILLISECONDS.convert(delay,
unit));
+ withLockServer(pod, server ->
server.setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit)));
}
private void refuseRequestsFromPod(String pod) {
- this.lockServers.get(pod).setRefuseRequests(true);
+ withLockServer(pod, server -> server.setRefuseRequests(true));
}
private void allowRequestsFromPod(String pod) {
- this.lockServers.get(pod).setRefuseRequests(false);
+ withLockServer(pod, server -> server.setRefuseRequests(false));
}
private void disconnectPod(String pod) {
- for (LockTestServer server : this.lockServers.values()) {
+ for (LockTestServer<?> server : this.lockServers.values()) {
server.removePod(pod);
}
}
private void connectPod(String pod) {
- for (LockTestServer server : this.lockServers.values()) {
+ for (LockTestServer<?> server : this.lockServers.values()) {
server.addPod(pod);
}
}
+ private void connectSimulator(ResourceLockSimulator<?> lockSimulator) {
+ for (LockTestServer<?> server : this.lockServers.values()) {
+ server.addSimulator(lockSimulator);
+ }
+ }
+
private void checkLeadershipChangeDistance(long minimum, TimeUnit unit,
LeaderRecorder... recorders) {
List<LeaderRecorder.LeadershipInfo> infos =
Arrays.stream(recorders).flatMap(lr -> lr.getLeadershipInfo().stream())
- .sorted((li1, li2) -> Long.compare(li1.getChangeTimestamp(),
li2.getChangeTimestamp()))
+
.sorted(Comparator.comparingLong(LeaderRecorder.LeadershipInfo::getChangeTimestamp))
.collect(Collectors.toList());
LeaderRecorder.LeadershipInfo currentLeaderLastSeen = null;
@@ -266,30 +386,69 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
}
}
- private LeaderRecorder addMember(String name) {
- return addMember(name, "app");
+ private LeaderRecorder addMember(String name, LeaseResourceType type) {
+ return addMember(name, "app", type);
}
- private LeaderRecorder addMember(String name, String namespace) {
- assertNull(this.lockServers.get(name));
+ private LeaderRecorder addMember(String name, String namespace,
LeaseResourceType type) {
+ return addMember(name, namespace, type, false);
+ }
- LockTestServer lockServer = new LockTestServer(lockSimulator);
- this.lockServers.put(name, lockServer);
+ private LeaderRecorder addMember(String name, String namespace,
LeaseResourceType type, boolean rebalancing) {
+ ResourceLockSimulator<?> lockSimulator;
+ switch (type) {
+ case ConfigMap:
+ if (this.configMapLockSimulator == null) {
+ this.configMapLockSimulator = new
ConfigMapLockSimulator("leaders");
+ }
+ lockSimulator = this.configMapLockSimulator;
+ break;
+ case Lease:
+ if (!this.leaseLockSimulators.containsKey(namespace)) {
+ this.leaseLockSimulators.put(namespace, new
LeaseLockSimulator("leaders-" + namespace));
+ }
+ lockSimulator = this.leaseLockSimulators.get(namespace);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported
LeaseResourceType " + type);
+ }
- KubernetesConfiguration configuration = new KubernetesConfiguration();
- configuration.setKubernetesClient(lockServer.createClient());
+ if (!this.lockServers.containsKey(name)) {
+ this.lockServers.put(name, new LockTestServer<>());
+ }
+ LockTestServer<?> lockServer = this.lockServers.get(name);
+
+ CamelPreemptiveClusterService member = clusterServices.get(name);
+ if (member == null) {
+ KubernetesConfiguration configuration = new
KubernetesConfiguration();
+ configuration.setKubernetesClient(lockServer.createClient());
+
+ KubernetesClusterService service = new
KubernetesClusterService(configuration);
+ service.setKubernetesNamespace("test");
+ service.setPodName(name);
+ service.setLeaseDurationMillis(LEASE_TIME_MILLIS);
+ service.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS);
+ service.setRetryPeriodMillis(RETRY_PERIOD_MILLIS);
+ service.setJitterFactor(JITTER_FACTOR);
+ service.setLeaseResourceType(type);
+
+ if (rebalancing) {
+ member = new RebalancingCamelClusterService(service,
RETRY_PERIOD_MILLIS);
+ } else {
+ member = service;
+ }
+
+ try {
+ context().addService(member);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
- KubernetesClusterService member = new
KubernetesClusterService(configuration);
- member.setKubernetesNamespace("test");
- member.setPodName(name);
- member.setLeaseDurationMillis(LEASE_TIME_MILLIS);
- member.setRenewDeadlineMillis(RENEW_DEADLINE_MILLIS);
- member.setRetryPeriodMillis(RETRY_PERIOD_MILLIS);
- member.setJitterFactor(JITTER_FACTOR);
+ clusterServices.put(name, member);
+ }
LeaderRecorder recorder = new LeaderRecorder();
try {
- context().addService(member);
member.getView(namespace).addEventListener(recorder);
} catch (Exception ex) {
throw new RuntimeException(ex);
@@ -297,7 +456,9 @@ public class KubernetesClusterServiceTest extends
CamelTestSupport {
for (String pod : this.lockServers.keySet()) {
connectPod(pod);
+ connectSimulator(lockSimulator);
}
+
return recorder;
}
diff --git
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
index f4e6e21..66dbe31 100644
---
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
+++
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
@@ -18,63 +18,39 @@ package org.apache.camel.component.kubernetes.cluster.utils;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Central lock for testing leader election.
+ * Central lock for testing leader election based on ConfigMap.
*/
-public class ConfigMapLockSimulator {
+public class ConfigMapLockSimulator extends ResourceLockSimulator<ConfigMap> {
- private static final Logger LOG =
LoggerFactory.getLogger(ConfigMapLockSimulator.class);
-
- private String configMapName;
-
- private ConfigMap currentMap;
-
- private long versionCounter = 1000000;
-
- public ConfigMapLockSimulator(String configMapName) {
- this.configMapName = configMapName;
+ public ConfigMapLockSimulator(String resourceName) {
+ super(resourceName);
}
- public String getConfigMapName() {
- return configMapName;
+ @Override
+ protected ConfigMap withNewResourceVersion(ConfigMap resource, String
newResourceVersion) {
+ return new
ConfigMapBuilder(resource).editOrNewMetadata().withResourceVersion(newResourceVersion)
+ .endMetadata().build();
}
- public synchronized boolean setConfigMap(ConfigMap map, boolean insert) {
- // Insert
- if (insert && currentMap != null) {
- LOG.error("Current map should have been null");
- return false;
- }
-
- // Update
- if (!insert && currentMap == null) {
- LOG.error("Current map should not have been null");
- return false;
- }
- String version = map.getMetadata() != null ?
map.getMetadata().getResourceVersion() : null;
- if (version != null) {
- long versionLong = Long.parseLong(version);
- if (versionLong != versionCounter) {
- LOG.warn("Current resource version is {} while the update is
related to version {}", versionCounter,
- versionLong);
- return false;
- }
- }
-
- this.currentMap = new
ConfigMapBuilder(map).editOrNewMetadata().withResourceVersion(String.valueOf(++versionCounter))
- .endMetadata().build();
- return true;
+ @Override
+ protected ConfigMap copyOf(ConfigMap resource) {
+ return new ConfigMapBuilder(resource).build();
}
- public synchronized ConfigMap getConfigMap() {
- if (currentMap == null) {
- return null;
- }
+ @Override
+ public String getResourcePath() {
+ return "configmaps";
+ }
- return new ConfigMapBuilder(currentMap).build();
+ @Override
+ public String getAPIPath() {
+ return "/api/v1";
}
+ @Override
+ public Class<ConfigMap> getResourceClass() {
+ return ConfigMap.class;
+ }
}
diff --git
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java
new file mode 100644
index 0000000..3d99656
--- /dev/null
+++
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LeaseLockSimulator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.cluster.utils;
+
+import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
+import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;
+
+/**
+ * Central lock for testing leader election based on Lease.
+ */
+public class LeaseLockSimulator extends ResourceLockSimulator<Lease> {
+
+ public LeaseLockSimulator(String resourceName) {
+ super(resourceName);
+ }
+
+ @Override
+ protected Lease withNewResourceVersion(Lease resource, String
newResourceVersion) {
+ return new
LeaseBuilder(resource).editOrNewMetadata().withResourceVersion(newResourceVersion)
+ .endMetadata().build();
+ }
+
+ @Override
+ protected Lease copyOf(Lease resource) {
+ return new LeaseBuilder(resource).build();
+ }
+
+ @Override
+ public String getResourcePath() {
+ return "leases";
+ }
+
+ @Override
+ public String getAPIPath() {
+ return "/apis/coordination.k8s.io/v1";
+ }
+
+ @Override
+ public Class<Lease> getResourceClass() {
+ return Lease.class;
+ }
+}
diff --git
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
index bd14acc..1a5bd7a 100644
---
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
+++
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/LockTestServer.java
@@ -19,12 +19,15 @@ package org.apache.camel.component.kubernetes.cluster.utils;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.fabric8.kubernetes.api.model.ConfigMap;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory;
/**
* A Test server to interact with Kubernetes for locking on a ConfigMap.
*/
-public class LockTestServer extends KubernetesMockServer {
+public class LockTestServer<T extends HasMetadata> extends
KubernetesMockServer {
private static final Logger LOG =
LoggerFactory.getLogger(LockTestServer.class);
@@ -47,15 +50,106 @@ public class LockTestServer extends KubernetesMockServer {
private Set<String> pods;
- public LockTestServer(ConfigMapLockSimulator lockSimulator) {
- this(lockSimulator, Collections.emptySet());
+ private Map<String, ResourceLockSimulator<T>> simulators;
+
+ public LockTestServer() {
+ this(Collections.emptySet());
}
- public LockTestServer(ConfigMapLockSimulator lockSimulator,
Collection<String> initialPods) {
+ public LockTestServer(Collection<String> initialPods) {
this.pods = new TreeSet<>(initialPods);
+ this.simulators = new HashMap<>();
+
+ // Other resources
+ expect().get().withPath("/api/v1/namespaces/test/pods")
+ .andReply(200,
+ request -> new
PodListBuilder().withNewMetadata().withResourceVersion("1").and()
+ .withItems(getCurrentPods().stream()
+ .map(name -> new
PodBuilder().withNewMetadata().withName(name).and().build())
+ .collect(Collectors.toList()))
+ .build())
+ .always();
- expect().get().withPath("/api/v1/namespaces/test/configmaps/" +
lockSimulator.getConfigMapName())
+ }
+
+ public void addSimulator(ResourceLockSimulator<?> paramLockSimulator) {
+ ResourceLockSimulator<T> lockSimulator = (ResourceLockSimulator<T>)
paramLockSimulator;
+ if (this.simulators.containsKey(lockSimulator.getResourceName())) {
+ return;
+ }
+ this.simulators.put(lockSimulator.getResourceName(), lockSimulator);
+
+ if (this.simulators.size() == 1) {
+ // Global methods defined once
+ expect().post().withPath(lockSimulator.getAPIPath() +
"/namespaces/test/" + lockSimulator.getResourcePath())
+ .andReply(new ResponseProvider<Object>() {
+
+ private Headers headers = new
Headers.Builder().build();
+ private Map<Integer, String> lockNames = new
HashMap<>();
+
+ @Override
+ public int getStatusCode(RecordedRequest request) {
+ if (refuseRequests) {
+ return 500;
+ }
+
+ T resource;
+ try {
+ resource = convert(request,
lockSimulator.getResourceClass());
+ } catch (Exception e) {
+ LOG.error("Error during resource conversion",
e);
+ return 500;
+ }
+
+ if (resource == null) {
+ LOG.error("No resource received");
+ return 500;
+ }
+ ResourceLockSimulator<T> lockSimulator =
simulators.get(resource.getMetadata().getName());
+ if (resource.getMetadata() == null
+ ||
!lockSimulator.getResourceName().equals(resource.getMetadata().getName())) {
+ LOG.error("Illegal resource received");
+ return 500;
+ }
+
+ boolean done = lockSimulator.setResource(resource,
true);
+ if (done) {
+ lockNames.put(request.getSequenceNumber(),
lockSimulator.getResourceName());
+ return 201;
+ }
+ return 500;
+ }
+
+ @Override
+ public Object getBody(RecordedRequest recordedRequest)
{
+ delayIfNecessary();
+
+ if
(lockNames.containsKey(recordedRequest.getSequenceNumber())) {
+ T resource =
simulators.get(lockNames.get(recordedRequest.getSequenceNumber())).getResource();
+ if (resource != null) {
+ return resource;
+ }
+ }
+
+ return "";
+ }
+
+ @Override
+ public Headers getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public void setHeaders(Headers headers) {
+ this.headers = headers;
+ }
+ }).always();
+ }
+
+ expect().get()
+ .withPath(lockSimulator.getAPIPath() + "/namespaces/test/" +
lockSimulator.getResourcePath() + "/"
+ + lockSimulator.getResourceName())
.andReply(new ResponseProvider<Object>() {
private Headers headers = new Headers.Builder().build();
@@ -66,7 +160,7 @@ public class LockTestServer extends KubernetesMockServer {
return 500;
}
- if (lockSimulator.getConfigMap() != null) {
+ if (lockSimulator.getResource() != null) {
return 200;
}
@@ -76,9 +170,9 @@ public class LockTestServer extends KubernetesMockServer {
@Override
public Object getBody(RecordedRequest recordedRequest) {
delayIfNecessary();
- ConfigMap map = lockSimulator.getConfigMap();
- if (map != null) {
- return map;
+ T resource = lockSimulator.getResource();
+ if (resource != null) {
+ return resource;
}
return "";
}
@@ -94,53 +188,9 @@ public class LockTestServer extends KubernetesMockServer {
}
}).always();
-
expect().post().withPath("/api/v1/namespaces/test/configmaps").andReply(new
ResponseProvider<Object>() {
-
- private Headers headers = new Headers.Builder().build();
-
- @Override
- public int getStatusCode(RecordedRequest request) {
- if (refuseRequests) {
- return 500;
- }
-
- ConfigMap map = convert(request);
- if (map == null || map.getMetadata() == null
- ||
!lockSimulator.getConfigMapName().equals(map.getMetadata().getName())) {
- throw new IllegalArgumentException("Illegal configMap
received");
- }
-
- boolean done = lockSimulator.setConfigMap(map, true);
- if (done) {
- return 201;
- }
- return 500;
- }
-
- @Override
- public Object getBody(RecordedRequest recordedRequest) {
- delayIfNecessary();
-
- ConfigMap map = lockSimulator.getConfigMap();
- if (map != null) {
- return map;
- }
-
- return "";
- }
-
- @Override
- public Headers getHeaders() {
- return headers;
- }
-
- @Override
- public void setHeaders(Headers headers) {
- this.headers = headers;
- }
- }).always();
-
- expect().put().withPath("/api/v1/namespaces/test/configmaps/" +
lockSimulator.getConfigMapName())
+ expect().put()
+ .withPath(lockSimulator.getAPIPath() + "/namespaces/test/" +
lockSimulator.getResourcePath() + "/"
+ + lockSimulator.getResourceName())
.andReply(new ResponseProvider<Object>() {
private Headers headers = new Headers.Builder().build();
@@ -151,9 +201,15 @@ public class LockTestServer extends KubernetesMockServer {
return 500;
}
- ConfigMap map = convert(request);
+ T resource;
+ try {
+ resource = convert(request,
lockSimulator.getResourceClass());
+ } catch (Exception e) {
+ LOG.error("Error during resource conversion", e);
+ return 500;
+ }
- boolean done = lockSimulator.setConfigMap(map, false);
+ boolean done = lockSimulator.setResource(resource,
false);
if (done) {
return 200;
}
@@ -163,9 +219,9 @@ public class LockTestServer extends KubernetesMockServer {
@Override
public Object getBody(RecordedRequest recordedRequest) {
delayIfNecessary();
- ConfigMap map = lockSimulator.getConfigMap();
- if (map != null) {
- return map;
+ T resource = lockSimulator.getResource();
+ if (resource != null) {
+ return resource;
}
return "";
@@ -181,16 +237,6 @@ public class LockTestServer extends KubernetesMockServer {
this.headers = headers;
}
}).always();
-
- // Other resources
- expect().get().withPath("/api/v1/namespaces/test/pods")
- .andReply(200,
- request -> new
PodListBuilder().withNewMetadata().withResourceVersion("1").and()
- .withItems(getCurrentPods().stream()
- .map(name -> new
PodBuilder().withNewMetadata().withName(name).and().build())
- .collect(Collectors.toList()))
- .build())
- .always();
}
public boolean isRefuseRequests() {
@@ -231,13 +277,9 @@ public class LockTestServer extends KubernetesMockServer {
}
}
- private ConfigMap convert(RecordedRequest request) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(request.getBody().readByteArray(),
ConfigMap.class);
- } catch (IOException e) {
- throw new IllegalArgumentException("Erroneous data", e);
- }
+ private T convert(RecordedRequest request, Class<T> targetClass) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper().registerModule(new
JavaTimeModule());
+ return mapper.readValue(request.getBody().readByteArray(),
targetClass);
}
}
diff --git
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java
similarity index 53%
copy from
components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
copy to
components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java
index f4e6e21..558f0f8 100644
---
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ConfigMapLockSimulator.java
+++
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/utils/ResourceLockSimulator.java
@@ -16,45 +16,44 @@
*/
package org.apache.camel.component.kubernetes.cluster.utils;
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Central lock for testing leader election.
*/
-public class ConfigMapLockSimulator {
+public abstract class ResourceLockSimulator<T extends HasMetadata> {
- private static final Logger LOG =
LoggerFactory.getLogger(ConfigMapLockSimulator.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ResourceLockSimulator.class);
- private String configMapName;
+ private String resourceName;
- private ConfigMap currentMap;
+ private T currentResource;
private long versionCounter = 1000000;
- public ConfigMapLockSimulator(String configMapName) {
- this.configMapName = configMapName;
+ public ResourceLockSimulator(String resourceName) {
+ this.resourceName = resourceName;
}
- public String getConfigMapName() {
- return configMapName;
+ public String getResourceName() {
+ return resourceName;
}
- public synchronized boolean setConfigMap(ConfigMap map, boolean insert) {
+ public synchronized boolean setResource(T resource, boolean insert) {
// Insert
- if (insert && currentMap != null) {
- LOG.error("Current map should have been null");
+ if (insert && currentResource != null) {
+ LOG.error("Current resource should have been null");
return false;
}
// Update
- if (!insert && currentMap == null) {
- LOG.error("Current map should not have been null");
+ if (!insert && currentResource == null) {
+ LOG.error("Current resource should not have been null");
return false;
}
- String version = map.getMetadata() != null ?
map.getMetadata().getResourceVersion() : null;
+ String version = resource.getMetadata() != null ?
resource.getMetadata().getResourceVersion() : null;
if (version != null) {
long versionLong = Long.parseLong(version);
if (versionLong != versionCounter) {
@@ -64,17 +63,26 @@ public class ConfigMapLockSimulator {
}
}
- this.currentMap = new
ConfigMapBuilder(map).editOrNewMetadata().withResourceVersion(String.valueOf(++versionCounter))
- .endMetadata().build();
+ this.currentResource = withNewResourceVersion(resource,
String.valueOf(++versionCounter));
return true;
}
- public synchronized ConfigMap getConfigMap() {
- if (currentMap == null) {
+ public synchronized T getResource() {
+ if (currentResource == null) {
return null;
}
- return new ConfigMapBuilder(currentMap).build();
+ return copyOf(currentResource);
}
+ protected abstract T withNewResourceVersion(T resource, String
newResourceVersion);
+
+ protected abstract T copyOf(T resource);
+
+ public abstract String getAPIPath();
+
+ public abstract String getResourcePath();
+
+ public abstract Class<T> getResourceClass();
+
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java
new file mode 100644
index 0000000..652ba67
--- /dev/null
+++
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.cluster;
+
+/**
+ * A {@link CamelPreemptiveClusterService} is a {@link CamelClusterService}
that manages
+ * {@link CamelPreemptiveClusterView}s.
+ */
+public interface CamelPreemptiveClusterService extends CamelClusterService {
+
+ @Override
+ CamelPreemptiveClusterView getView(String namespace) throws Exception;
+
+}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java
new file mode 100644
index 0000000..c8a2136
--- /dev/null
+++
b/core/camel-api/src/main/java/org/apache/camel/cluster/CamelPreemptiveClusterView.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.cluster;
+
+/**
+ * A {@link CamelPreemptiveClusterView} is a {@link CamelClusterView} that can
be externally disabled by another
+ * controller.
+ */
+public interface CamelPreemptiveClusterView extends CamelClusterView {
+
+ /**
+ * Enable or disables a view.
+ */
+ void setDisabled(boolean disabled);
+
+ /**
+ * Check if the view is disabled.
+ */
+ boolean isDisabled();
+
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java
b/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java
new file mode 100644
index 0000000..372734f
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/cluster/RebalancingCamelClusterService.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support.cluster;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.cluster.CamelPreemptiveClusterService;
+import org.apache.camel.cluster.CamelPreemptiveClusterView;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link RebalancingCamelClusterService} adds rebalancing capabilities to
an underlying
+ * {@link CamelPreemptiveClusterService}. Each view is treated as a partition
by this cluster service and it makes sure
+ * that all services belonging to the cluster own a balanced number of
partitions (same number or difference at most 1
+ * when not possible).
+ */
+public class RebalancingCamelClusterService implements
CamelPreemptiveClusterService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RebalancingCamelClusterService.class);
+
+ protected ScheduledExecutorService serializedExecutor;
+
+ protected CamelPreemptiveClusterService delegate;
+
+ protected CamelContext camelContext;
+
+ protected long periodMillis;
+
+ public RebalancingCamelClusterService(CamelPreemptiveClusterService
delegate, long periodMillis) {
+ this.delegate = ObjectHelper.notNull(delegate, "delegate");
+ this.periodMillis = periodMillis;
+ }
+
+ public RebalancingCamelClusterService(CamelContext camelContext,
CamelPreemptiveClusterService delegate,
+ long periodMillis) {
+ this.camelContext = ObjectHelper.notNull(camelContext, "camelContext");
+ this.delegate = ObjectHelper.notNull(delegate, "delegate");
+ this.periodMillis = periodMillis;
+ }
+
+ @Override
+ public void start() {
+ delegate.start();
+ if (serializedExecutor == null) {
+ serializedExecutor =
getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+ "RebalancingClusterService");
+ serializedExecutor.execute(this::reconcile);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (serializedExecutor != null) {
+ serializedExecutor.shutdownNow();
+ }
+ serializedExecutor = null;
+
+ delegate.stop();
+ }
+
+ public CamelPreemptiveClusterService getDelegate() {
+ return delegate;
+ }
+
+ public long getPeriodMillis() {
+ return periodMillis;
+ }
+
+ public void setDelegate(CamelPreemptiveClusterService delegate) {
+ this.delegate = delegate;
+ }
+
+ protected void reconcile() {
+ Integer n = members();
+ List<String> partitions = partitionList();
+ int k = partitions.size();
+
+ if (n == null || n == 0 || k == 0) {
+ rescheduleAfterDelay();
+ return;
+ }
+
+ int threshold = 0;
+ while (threshold <= k) {
+ threshold += n;
+ }
+ threshold -= n;
+
+ int quota = threshold / n;
+
+ List<String> main = new ArrayList<>();
+ List<String> remaining = new ArrayList<>();
+ for (int i = 0; i < threshold; i++) {
+ main.add(partitions.get(i));
+ }
+ for (int i = threshold; i < partitions.size(); i++) {
+ remaining.add(partitions.get(i));
+ }
+
+ rebalanceGroup(main, quota);
+ rebalanceGroup(remaining, 1);
+ rescheduleAfterDelay();
+ }
+
+ protected void rebalanceGroup(List<String> partitions, int quota) {
+ List<String> owned = owned(partitions);
+ if (owned == null) {
+ return;
+ }
+
+ if (owned.size() < quota) {
+ // Open all (to let the controller choose which ones)
+ for (String partition : partitions) {
+ setDisabled(partition, false);
+ }
+ } else if (owned.size() > quota) {
+ for (int i = 0; i < owned.size() - quota; i++) {
+ setDisabled(owned.get(i), true);
+ }
+ } else {
+ // We're fine, but we prevent this instance from stealing locks
that are not needed
+ Set<String> ownedSet = new HashSet<>(owned);
+ for (String partition : partitions) {
+ if (!ownedSet.contains(partition)) {
+ setDisabled(partition, true);
+ }
+ }
+ }
+ }
+
+ protected void setDisabled(String partition, boolean disabled) {
+ try {
+ LOG.debug("Setting partition {} to disabled={}...", partition,
disabled);
+ CamelPreemptiveClusterView view = delegate.getView(partition);
+ if (view.isDisabled() != disabled) {
+ view.setDisabled(disabled);
+ }
+ } catch (Exception ex) {
+ LOG.warn("Could not get view " + partition, ex);
+ }
+ }
+
+ protected List<String> owned(List<String> partitions) {
+ List<String> owned = new ArrayList<>(partitions.size());
+ for (String partition : partitions) {
+ try {
+ CamelPreemptiveClusterView view = delegate.getView(partition);
+ if (!view.isDisabled() && view.getLocalMember().isLeader()) {
+ owned.add(partition);
+ }
+ } catch (Exception ex) {
+ LOG.warn("Could not get view " + partition, ex);
+ return null;
+ }
+ }
+ return owned;
+ }
+
+ protected List<String> partitionList() {
+ ArrayList<String> partitions = new ArrayList<>(this.getNamespaces());
+ Collections.sort(partitions);
+ return partitions;
+ }
+
+ protected Integer members() {
+ Set<String> members = null;
+ for (String group : this.getNamespaces()) {
+ try {
+ CamelPreemptiveClusterView view = delegate.getView(group);
+ Set<String> viewMembers =
view.getMembers().stream().map(CamelClusterMember::getId).collect(Collectors.toSet());
+ if (members != null && !members.equals(viewMembers)) {
+ LOG.debug("View members don't match: {} vs {}", members,
viewMembers);
+ return null;
+ }
+ members = viewMembers;
+ } catch (Exception ex) {
+ LOG.warn("Could not get view " + group, ex);
+ return null;
+ }
+ }
+ return members != null ? members.size() : 0;
+ }
+
+ private void rescheduleAfterDelay() {
+ this.serializedExecutor.schedule(this::reconcile,
+ this.periodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public CamelPreemptiveClusterView getView(String namespace) throws
Exception {
+ return delegate.getView(namespace);
+ }
+
+ @Override
+ public void releaseView(CamelClusterView view) throws Exception {
+ delegate.releaseView(view);
+ }
+
+ @Override
+ public Collection<String> getNamespaces() {
+ return delegate.getNamespaces();
+ }
+
+ @Override
+ public void startView(String namespace) throws Exception {
+ delegate.startView(namespace);
+ }
+
+ @Override
+ public void stopView(String namespace) throws Exception {
+ delegate.stopView(namespace);
+ }
+
+ @Override
+ public boolean isLeader(String namespace) {
+ return delegate.isLeader(namespace);
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ delegate.setCamelContext(camelContext);
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return this.camelContext;
+ }
+
+ @Override
+ public void setId(String id) {
+ delegate.setId(id);
+ }
+
+ @Override
+ public String getId() {
+ return delegate.getId();
+ }
+}