Repository: camel
Updated Branches:
  refs/heads/master 9fc6d0bda -> a3d0df31d


CAMEL-11331: Using Camel thread pools


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3d0df31
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3d0df31
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3d0df31

Branch: refs/heads/master
Commit: a3d0df31dc067d55adf7f654cd545f19a400341e
Parents: 4a0f822
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Authored: Tue Aug 8 15:39:22 2017 +0200
Committer: Nicola Ferraro <ni.ferr...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200

----------------------------------------------------------------------
 .../kubernetes/ha/KubernetesClusterService.java         |  2 +-
 .../component/kubernetes/ha/KubernetesClusterView.java  | 12 ++++++++----
 .../ha/lock/KubernetesLeadershipController.java         | 11 +++++++----
 .../kubernetes/ha/lock/TimedLeaderNotifier.java         |  9 ++++++---
 .../kubernetes/ha/KubernetesClusterServiceTest.java     |  2 +-
 .../kubernetes/ha/TimedLeaderNotifierTest.java          | 10 +++++++++-
 6 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
index 1c95b22..00cb04d 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
@@ -57,7 +57,7 @@ public class KubernetesClusterService extends 
AbstractCamelClusterService<Kubern
     protected KubernetesClusterView createView(String namespace) throws 
Exception {
         KubernetesLockConfiguration lockConfig = 
lockConfigWithGroupNameAndDefaults(namespace);
         KubernetesConfiguration config = 
setConfigDefaults(this.configuration.copy(), lockConfig);
-        return new KubernetesClusterView(this, config, lockConfig);
+        return new KubernetesClusterView(getCamelContext(), this, config, 
lockConfig);
     }
 
     protected KubernetesConfiguration 
setConfigDefaults(KubernetesConfiguration configuration, 
KubernetesLockConfiguration lockConfiguration) {

http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
index ddda675..a67b662 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
 import org.apache.camel.component.kubernetes.KubernetesHelper;
 import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
@@ -42,6 +43,8 @@ import org.apache.camel.util.ObjectHelper;
  */
 public class KubernetesClusterView extends AbstractCamelClusterView {
 
+    private CamelContext camelContext;
+
     private KubernetesClient kubernetesClient;
 
     private KubernetesConfiguration configuration;
@@ -58,10 +61,11 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
 
     private KubernetesLeadershipController controller;
 
-    public KubernetesClusterView(KubernetesClusterService cluster, 
KubernetesConfiguration configuration, KubernetesLockConfiguration 
lockConfiguration) {
+    public KubernetesClusterView(CamelContext camelContext, 
KubernetesClusterService cluster, KubernetesConfiguration configuration, 
KubernetesLockConfiguration lockConfiguration) {
         super(cluster, lockConfiguration.getGroupName());
-        this.configuration = configuration;
-        this.lockConfiguration = lockConfiguration;
+        this.camelContext = ObjectHelper.notNull(camelContext, "camelContext");
+        this.configuration = ObjectHelper.notNull(configuration, 
"configuration");
+        this.lockConfiguration = ObjectHelper.notNull(lockConfiguration, 
"lockConfiguration");
         this.localMember = new 
KubernetesClusterMember(lockConfiguration.getPodName());
         this.memberCache = new HashMap<>();
     }
@@ -86,7 +90,7 @@ public class KubernetesClusterView extends 
AbstractCamelClusterView {
         if (controller == null) {
             this.kubernetesClient = 
KubernetesHelper.getKubernetesClient(configuration);
 
-            controller = new KubernetesLeadershipController(kubernetesClient, 
this.lockConfiguration, event -> {
+            controller = new KubernetesLeadershipController(camelContext, 
kubernetesClient, this.lockConfiguration, event -> {
                 if (event instanceof 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
                     // New leader
                     Optional<String> leader = 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();

http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
index 2f79bd7..25a09f8 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -31,6 +30,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +49,8 @@ public class KubernetesLeadershipController implements 
Service {
         LEADER
     }
 
+    private CamelContext camelContext;
+
     private KubernetesClient kubernetesClient;
 
     private KubernetesLockConfiguration lockConfiguration;
@@ -65,7 +67,8 @@ public class KubernetesLeadershipController implements 
Service {
     private volatile ConfigMap latestConfigMap;
     private volatile Set<String> latestMembers;
 
-    public KubernetesLeadershipController(KubernetesClient kubernetesClient, 
KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler 
eventHandler) {
+    public KubernetesLeadershipController(CamelContext camelContext, 
KubernetesClient kubernetesClient, KubernetesLockConfiguration 
lockConfiguration, KubernetesClusterEventHandler eventHandler) {
+        this.camelContext = camelContext;
         this.kubernetesClient = kubernetesClient;
         this.lockConfiguration = lockConfiguration;
         this.eventHandler = eventHandler;
@@ -75,8 +78,8 @@ public class KubernetesLeadershipController implements 
Service {
     public void start() throws Exception {
         if (serializedExecutor == null) {
             LOG.debug("{} Starting leadership controller...", logPrefix());
-            serializedExecutor = Executors.newSingleThreadScheduledExecutor();
-            leaderNotifier = new TimedLeaderNotifier(this.eventHandler);
+            serializedExecutor = 
camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, 
"CamelKubernetesLeadershipController");
+            leaderNotifier = new TimedLeaderNotifier(this.camelContext, 
this.eventHandler);
 
             leaderNotifier.start();
             serializedExecutor.execute(this::refreshStatus);

http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
index c95b517..f805536 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/TimedLeaderNotifier.java
@@ -20,12 +20,12 @@ import java.util.Collections;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +39,8 @@ public class TimedLeaderNotifier implements Service {
 
     private static final long FIXED_DELAY = 10;
 
+    private CamelContext camelContext;
+
     private KubernetesClusterEventHandler handler;
 
     private Lock lock = new ReentrantLock();
@@ -58,7 +60,8 @@ public class TimedLeaderNotifier implements Service {
 
     private long changeCounter;
 
-    public TimedLeaderNotifier(KubernetesClusterEventHandler handler) {
+    public TimedLeaderNotifier(CamelContext camelContext, 
KubernetesClusterEventHandler handler) {
+        this.camelContext = Objects.requireNonNull(camelContext, "Camel 
context must be present");
         this.handler = Objects.requireNonNull(handler, "Handler must be 
present");
     }
 
@@ -91,7 +94,7 @@ public class TimedLeaderNotifier implements Service {
     @Override
     public void start() throws Exception {
         if (this.executor == null) {
-            this.executor = Executors.newSingleThreadScheduledExecutor();
+            this.executor = 
camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, 
"CamelKubernetesLeaderNotifier");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
index 4a2a11e..62174c2 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
@@ -285,8 +285,8 @@ public class KubernetesClusterServiceTest extends 
CamelTestSupport {
 
         LeaderRecorder recorder = new LeaderRecorder();
         try {
-            member.getView(namespace).addEventListener(recorder);
             context().addService(member);
+            member.getView(namespace).addEventListener(recorder);
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/a3d0df31/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
index 8380147..164912d 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
@@ -22,8 +22,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
 import org.apache.camel.component.kubernetes.ha.lock.TimedLeaderNotifier;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,6 +37,8 @@ import static org.junit.Assert.assertEquals;
  */
 public class TimedLeaderNotifierTest {
 
+    private CamelContext context;
+
     private TimedLeaderNotifier notifier;
 
     private volatile Optional<String> currentLeader;
@@ -43,7 +47,10 @@ public class TimedLeaderNotifierTest {
 
     @Before
     public void init() throws Exception {
-        this.notifier = new TimedLeaderNotifier(e -> {
+        this.context = new DefaultCamelContext();
+        this.context.start();
+
+        this.notifier = new TimedLeaderNotifier(context, e -> {
             if (e instanceof 
KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
                 currentLeader = 
((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) e).getData();
             } else if (e instanceof 
KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) {
@@ -56,6 +63,7 @@ public class TimedLeaderNotifierTest {
     @After
     public void destroy() throws Exception {
         this.notifier.stop();
+        this.context.stop();
     }
 
     @Test

Reply via email to