Repository: camel Updated Branches: refs/heads/master 9fc6d0bda -> a3d0df31d
CAMEL-11331: Using Camel thread pools Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3d0df31 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3d0df31 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3d0df31 Branch: refs/heads/master Commit: a3d0df31dc067d55adf7f654cd545f19a400341e Parents: 4a0f822 Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Tue Aug 8 15:39:22 2017 +0200 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Tue Aug 8 16:39:43 2017 +0200 ---------------------------------------------------------------------- .../kubernetes/ha/KubernetesClusterService.java | 2 +- .../component/kubernetes/ha/KubernetesClusterView.java | 12 ++++++++---- .../ha/lock/KubernetesLeadershipController.java | 11 +++++++---- .../kubernetes/ha/lock/TimedLeaderNotifier.java | 9 ++++++--- .../kubernetes/ha/KubernetesClusterServiceTest.java | 2 +- .../kubernetes/ha/TimedLeaderNotifierTest.java | 10 +++++++++- 6 files changed, 32 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java index 1c95b22..00cb04d 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java @@ -57,7 +57,7 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern protected KubernetesClusterView createView(String namespace) throws Exception { KubernetesLockConfiguration lockConfig = lockConfigWithGroupNameAndDefaults(namespace); KubernetesConfiguration config = setConfigDefaults(this.configuration.copy(), lockConfig); - return new KubernetesClusterView(this, config, lockConfig); + return new KubernetesClusterView(getCamelContext(), this, config, lockConfig); } protected KubernetesConfiguration setConfigDefaults(KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java index ddda675..a67b662 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.CamelContext; import org.apache.camel.component.kubernetes.KubernetesConfiguration; import org.apache.camel.component.kubernetes.KubernetesHelper; import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent; @@ -42,6 +43,8 @@ import org.apache.camel.util.ObjectHelper; */ public class KubernetesClusterView extends AbstractCamelClusterView { + private CamelContext camelContext; + private KubernetesClient kubernetesClient; private KubernetesConfiguration configuration; @@ -58,10 +61,11 @@ public class KubernetesClusterView extends AbstractCamelClusterView { private KubernetesLeadershipController controller; - public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { + public KubernetesClusterView(CamelContext camelContext, KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) { super(cluster, lockConfiguration.getGroupName()); - this.configuration = configuration; - this.lockConfiguration = lockConfiguration; + this.camelContext = ObjectHelper.notNull(camelContext, "camelContext"); + this.configuration = ObjectHelper.notNull(configuration, "configuration"); + this.lockConfiguration = ObjectHelper.notNull(lockConfiguration, "lockConfiguration"); this.localMember = new KubernetesClusterMember(lockConfiguration.getPodName()); this.memberCache = new HashMap<>(); } @@ -86,7 +90,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView { if (controller == null) { this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration); - controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> { + controller = new KubernetesLeadershipController(camelContext, kubernetesClient, this.lockConfiguration, event -> { if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) { // New leader Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData(); http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java index 2f79bd7..25a09f8 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -31,6 +30,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.camel.CamelContext; import org.apache.camel.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +49,8 @@ public class KubernetesLeadershipController implements Service { LEADER } + private CamelContext camelContext; + private KubernetesClient kubernetesClient; private KubernetesLockConfiguration lockConfiguration; @@ -65,7 +67,8 @@ public class KubernetesLeadershipController implements Service { private volatile ConfigMap latestConfigMap; private volatile Set<String> latestMembers; - public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { + public KubernetesLeadershipController(CamelContext camelContext, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) { + this.camelContext = camelContext; this.kubernetesClient = kubernetesClient; this.lockConfiguration = lockConfiguration; this.eventHandler = eventHandler; @@ -75,8 +78,8 @@ public class KubernetesLeadershipController implements Service { public void start() throws Exception { if (serializedExecutor == null) { LOG.debug("{} Starting leadership controller...", logPrefix()); - serializedExecutor = Executors.newSingleThreadScheduledExecutor(); - leaderNotifier = new TimedLeaderNotifier(this.eventHandler); + serializedExecutor = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelKubernetesLeadershipController"); + leaderNotifier = new TimedLeaderNotifier(this.camelContext, this.eventHandler); leaderNotifier.start(); serializedExecutor.execute(this::refreshStatus); http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java index c95b517..f805536 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java @@ -20,12 +20,12 @@ import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.camel.CamelContext; import org.apache.camel.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +39,8 @@ public class TimedLeaderNotifier implements Service { private static final long FIXED_DELAY = 10; + private CamelContext camelContext; + private KubernetesClusterEventHandler handler; private Lock lock = new ReentrantLock(); @@ -58,7 +60,8 @@ public class TimedLeaderNotifier implements Service { private long changeCounter; - public TimedLeaderNotifier(KubernetesClusterEventHandler handler) { + public TimedLeaderNotifier(CamelContext camelContext, KubernetesClusterEventHandler handler) { + this.camelContext = Objects.requireNonNull(camelContext, "Camel context must be present"); this.handler = Objects.requireNonNull(handler, "Handler must be present"); } @@ -91,7 +94,7 @@ public class TimedLeaderNotifier implements Service { @Override public void start() throws Exception { if (this.executor == null) { - this.executor = Executors.newSingleThreadScheduledExecutor(); + this.executor = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelKubernetesLeaderNotifier"); } } http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java index 4a2a11e..62174c2 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java @@ -285,8 +285,8 @@ public class KubernetesClusterServiceTest extends CamelTestSupport { LeaderRecorder recorder = new LeaderRecorder(); try { - member.getView(namespace).addEventListener(recorder); context().addService(member); + member.getView(namespace).addEventListener(recorder); } catch (Exception ex) { throw new RuntimeException(ex); } http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java index 8380147..164912d 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java @@ -22,8 +22,10 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import org.apache.camel.CamelContext; import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent; import org.apache.camel.component.kubernetes.ha.lock.TimedLeaderNotifier; +import org.apache.camel.impl.DefaultCamelContext; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -35,6 +37,8 @@ import static org.junit.Assert.assertEquals; */ public class TimedLeaderNotifierTest { + private CamelContext context; + private TimedLeaderNotifier notifier; private volatile Optional<String> currentLeader; @@ -43,7 +47,10 @@ public class TimedLeaderNotifierTest { @Before public void init() throws Exception { - this.notifier = new TimedLeaderNotifier(e -> { + this.context = new DefaultCamelContext(); + this.context.start(); + + this.notifier = new TimedLeaderNotifier(context, e -> { if (e instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) { currentLeader = ((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) e).getData(); } else if (e instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) { @@ -56,6 +63,7 @@ public class TimedLeaderNotifierTest { @After public void destroy() throws Exception { this.notifier.stop(); + this.context.stop(); } @Test