Repository: karaf-cellar Updated Branches: refs/heads/cellar-3.0.x 8712d50b5 -> f2ffeb5ff
[KARAF-1842] Implemented Service Tracker to remove unavailable distributed service. Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/f2ffeb5f Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/f2ffeb5f Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/f2ffeb5f Branch: refs/heads/cellar-3.0.x Commit: f2ffeb5ffa84276eef568719fe5b7086b0aee1f5 Parents: 8712d50 Author: Jean-Baptiste Onofré <[email protected]> Authored: Sun Sep 20 11:43:26 2015 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Sun Sep 20 11:43:26 2015 +0200 ---------------------------------------------------------------------- .../karaf/cellar/dosgi/ServiceTracker.java | 110 +++++++++++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 4 + .../hazelcast/HazelcastClusterManager.java | 6 +- .../cellar/hazelcast/HazelcastGroupManager.java | 2 +- .../hazelcast/HazelcastInstanceAware.java | 2 +- .../karaf/cellar/hazelcast/QueueConsumer.java | 36 +++--- 6 files changed, 140 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java ---------------------------------------------------------------------- diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java new file mode 100644 index 0000000..45c897c --- /dev/null +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.karaf.cellar.dosgi; + +import org.apache.karaf.cellar.core.ClusterManager; +import org.apache.karaf.cellar.core.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Listener called when a new service is exported. + */ +public class ServiceTracker implements Runnable { + + private static final transient Logger LOGGER = LoggerFactory.getLogger(ServiceTracker.class); + + private ClusterManager clusterManager; + + private Map<String, EndpointDescription> remoteEndpoints; + + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + public void init() { + LOGGER.info("CELLAR SERVICE TRACKER: a new Task initialized"); + remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS); + scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS); + } + + public void destroy() { + LOGGER.info("CELLAR SERVICE TRACKER: task is being destroyed"); + scheduler.shutdown(); + } + + public ClusterManager getClusterManager() { + return clusterManager; + } + + public void setClusterManager(ClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + + @Override + public void run() { + LOGGER.trace("SERVICE TRACKER: running the service tracker task"); + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + try { + if (!remoteEndpoints.isEmpty()) { + LOGGER.trace("SERVICE TRACKER: Founded {} remote endpoints", remoteEndpoints.size()); + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final Set<Node> activeNodes = clusterManager.listNodes(); + + // create a clone of remote endpoint to avoid concurrency concerns while iterating it + final Set<Map.Entry<String, EndpointDescription>> list = new HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet()); + + for (Map.Entry<String, EndpointDescription> entry : list) { + final EndpointDescription endpointDescription = entry.getValue(); + final String key = entry.getKey(); + + // create a clone of nodes to avoid concurrency concerns while iterating it + final Set<Node> nodes = new HashSet<Node>(endpointDescription.getNodes()); + + boolean endpointChanged = false; + for(Node n : nodes) { + if(!activeNodes.contains(n)) { + LOGGER.debug("SERVICE TRACKER: Removing node with id {} since it is not active", n.getId()); + endpointDescription.getNodes().remove(n); + endpointChanged = true; + } + } + + if(endpointChanged) { + // if the endpoint is used for export from other nodes too, then update it + if (endpointDescription.getNodes().size() > 0) { + LOGGER.debug("SERVICE TRACKER: Updating remote endpoint {}", key); + remoteEndpoints.put(key, endpointDescription); + } else { // remove endpoint permanently + LOGGER.debug("SERVICE TRACKER: Removing remote endpoint {}", key); + remoteEndpoints.remove(key); + } + } + } + } else { + LOGGER.debug("SERVICE TRACKER: no remote endpoints found"); + } + } catch (Exception e) { + LOGGER.error("SERVICE TRACKER: failed to run service tracker task",e); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + +} http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 51171fc..75ee4c0 100644 --- a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -55,4 +55,8 @@ <reference id="commandStore" interface="org.apache.karaf.cellar.core.command.CommandStore"/> <reference id="configurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/> + + <bean id="serviceTracker" class="org.apache.karaf.cellar.dosgi.ServiceTracker" init-method="init" destroy-method="destroy"> + <property name="clusterManager" ref="clusterManager"/> + </bean> </blueprint> http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java ---------------------------------------------------------------------- diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java index b9a9619..9059eb5 100644 --- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java +++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java @@ -87,7 +87,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C Set<Member> members = cluster.getMembers(); if (members != null && !members.isEmpty()) { for (Member member : members) { - HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort()); + HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort()); nodes.add(node); } } @@ -110,7 +110,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C Set<Member> members = cluster.getMembers(); if (members != null && !members.isEmpty()) { for (Member member : members) { - HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort()); + HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort()); if (ids.contains(node.getId())) { nodes.add(node); } @@ -135,7 +135,7 @@ public class HazelcastClusterManager extends HazelcastInstanceAware implements C Set<Member> members = cluster.getMembers(); if (members != null && !members.isEmpty()) { for (Member member : members) { - HazelcastNode node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort()); + HazelcastNode node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort()); if (id.equals(node.getId())) { return node; } http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java ---------------------------------------------------------------------- diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java index 57a5d71..78370dd 100644 --- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java +++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java @@ -140,7 +140,7 @@ public class HazelcastGroupManager implements GroupManager, EntryListener, Confi Cluster cluster = instance.getCluster(); if (cluster != null) { Member member = cluster.getLocalMember(); - node = new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort()); + node = new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort()); } return node; } finally { http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java ---------------------------------------------------------------------- diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java index 87e1288..7c9f14d 100644 --- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java +++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java @@ -42,7 +42,7 @@ public class HazelcastInstanceAware { Cluster cluster = instance.getCluster(); if (cluster != null) { Member member = cluster.getLocalMember(); - return new HazelcastNode(member.getInetSocketAddress().getHostName(), member.getInetSocketAddress().getPort()); + return new HazelcastNode(member.getSocketAddress().getHostString(), member.getSocketAddress().getPort()); } else { return null; } http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java ---------------------------------------------------------------------- diff --git a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java index 85c7534..d19461d 100644 --- a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java +++ b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java @@ -86,26 +86,32 @@ public class QueueConsumer<E extends Event> implements EventConsumer<E>, ItemLis @Override public void run() { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); - try { - while (isConsuming) { - if (combinedClassLoader != null) { - Thread.currentThread().setContextClassLoader(combinedClassLoader); - } else Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - E e = null; - try { - e = getQueue().poll(10, TimeUnit.SECONDS); - } catch (InterruptedException e1) { - LOGGER.warn("CELLAR HAZELCAST: consume task interrupted"); - } + E e; + while (isConsuming) { + if (combinedClassLoader != null) { + Thread.currentThread().setContextClassLoader(combinedClassLoader); + } else Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + e = null; + try { + e = getQueue().poll(10, TimeUnit.SECONDS); + } catch (InterruptedException e1) { + LOGGER.warn("CELLAR HAZELCAST: consume task interrupted"); + } catch (Exception e2) { + // catch everything from Hazelcast to prevent the death of Queue Consumer task + LOGGER.warn("CELLAR HAZELCAST: consumer task failed to poll the queue", e2); + } + + try { if (e != null) { consume(e); } + } catch (Exception e1) { + LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", e1); } - } catch (Exception ex) { - LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", ex); - } finally { - Thread.currentThread().setContextClassLoader(originalClassLoader); + } + + Thread.currentThread().setContextClassLoader(originalClassLoader); } /**
