Repository: karaf-cellar
Updated Branches:
  refs/heads/cellar-3.0.x 8712d50b5 -> f2ffeb5ff


[KARAF-1842] Implemented Service Tracker to remove unavailable distributed 
service.


Project: http://git-wip-us.apache.org/repos/asf/karaf-cellar/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-cellar/commit/f2ffeb5f
Tree: http://git-wip-us.apache.org/repos/asf/karaf-cellar/tree/f2ffeb5f
Diff: http://git-wip-us.apache.org/repos/asf/karaf-cellar/diff/f2ffeb5f

Branch: refs/heads/cellar-3.0.x
Commit: f2ffeb5ffa84276eef568719fe5b7086b0aee1f5
Parents: 8712d50
Author: Jean-Baptiste Onofré <[email protected]>
Authored: Sun Sep 20 11:43:26 2015 +0200
Committer: Jean-Baptiste Onofré <[email protected]>
Committed: Sun Sep 20 11:43:26 2015 +0200

----------------------------------------------------------------------
 .../karaf/cellar/dosgi/ServiceTracker.java      | 110 +++++++++++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   4 +
 .../hazelcast/HazelcastClusterManager.java      |   6 +-
 .../cellar/hazelcast/HazelcastGroupManager.java |   2 +-
 .../hazelcast/HazelcastInstanceAware.java       |   2 +-
 .../karaf/cellar/hazelcast/QueueConsumer.java   |  36 +++---
 6 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
----------------------------------------------------------------------
diff --git 
a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java 
b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
new file mode 100644
index 0000000..45c897c
--- /dev/null
+++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ServiceTracker.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.cellar.dosgi;
+
+import org.apache.karaf.cellar.core.ClusterManager;
+import org.apache.karaf.cellar.core.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Listener called when a new service is exported.
+ */
+public class ServiceTracker implements Runnable {
+
+    private static final transient Logger LOGGER = 
LoggerFactory.getLogger(ServiceTracker.class);
+
+    private ClusterManager clusterManager;
+
+    private Map<String, EndpointDescription> remoteEndpoints;
+
+    private ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+    
+    public void init() {
+        LOGGER.info("CELLAR SERVICE TRACKER: a new Task initialized");
+        remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS);
+        scheduler.scheduleWithFixedDelay(this, 10, 10, TimeUnit.SECONDS);
+    }
+
+    public void destroy() {
+        LOGGER.info("CELLAR SERVICE TRACKER: task is being destroyed");
+        scheduler.shutdown();
+    }
+
+    public ClusterManager getClusterManager() {
+        return clusterManager;
+    }
+
+    public void setClusterManager(ClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+
+    @Override
+    public void run() {
+        LOGGER.trace("SERVICE TRACKER: running the service tracker task");
+        ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            if (!remoteEndpoints.isEmpty()) {
+                LOGGER.trace("SERVICE TRACKER: Founded {} remote endpoints", 
remoteEndpoints.size());
+                
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                final Set<Node> activeNodes = clusterManager.listNodes();
+                
+                // create a clone of remote endpoint to avoid concurrency 
concerns while iterating it
+                final Set<Map.Entry<String, EndpointDescription>> list = new 
HashSet<Map.Entry<String, EndpointDescription>>(remoteEndpoints.entrySet());
+                
+                for (Map.Entry<String, EndpointDescription> entry : list) {
+                    final EndpointDescription endpointDescription = 
entry.getValue();
+                    final String key = entry.getKey();
+                    
+                    // create a clone of nodes to avoid concurrency concerns 
while iterating it
+                    final Set<Node> nodes = new 
HashSet<Node>(endpointDescription.getNodes());
+                    
+                    boolean endpointChanged = false;
+                    for(Node n : nodes) {
+                        if(!activeNodes.contains(n)) {
+                            LOGGER.debug("SERVICE TRACKER: Removing node with 
id {} since it is not active", n.getId());
+                            endpointDescription.getNodes().remove(n);
+                            endpointChanged = true;                        
+                        }
+                    }
+
+                    if(endpointChanged) {
+                        // if the endpoint is used for export from other nodes 
too, then update it
+                        if (endpointDescription.getNodes().size() > 0) {
+                            LOGGER.debug("SERVICE TRACKER: Updating remote 
endpoint {}", key);
+                            remoteEndpoints.put(key, endpointDescription);
+                        } else { // remove endpoint permanently
+                            LOGGER.debug("SERVICE TRACKER: Removing remote 
endpoint {}", key);
+                            remoteEndpoints.remove(key);
+                        }
+                    } 
+                }
+            } else {
+                LOGGER.debug("SERVICE TRACKER: no remote endpoints found");
+            }
+        } catch (Exception e) {
+            LOGGER.error("SERVICE TRACKER: failed to run service tracker 
task",e);
+        } finally {
+           Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+  
+}

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml 
b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 51171fc..75ee4c0 100644
--- a/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/dosgi/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -55,4 +55,8 @@
     <reference id="commandStore" 
interface="org.apache.karaf.cellar.core.command.CommandStore"/>
     <reference id="configurationAdmin" 
interface="org.osgi.service.cm.ConfigurationAdmin"/>
 
+
+    <bean id="serviceTracker" 
class="org.apache.karaf.cellar.dosgi.ServiceTracker" init-method="init" 
destroy-method="destroy">
+        <property name="clusterManager" ref="clusterManager"/>
+    </bean>
 </blueprint>

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
----------------------------------------------------------------------
diff --git 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
index b9a9619..9059eb5 100644
--- 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
+++ 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastClusterManager.java
@@ -87,7 +87,7 @@ public class HazelcastClusterManager extends 
HazelcastInstanceAware implements C
             Set<Member> members = cluster.getMembers();
             if (members != null && !members.isEmpty()) {
                 for (Member member : members) {
-                    HazelcastNode node = new 
HazelcastNode(member.getInetSocketAddress().getHostName(), 
member.getInetSocketAddress().getPort());
+                    HazelcastNode node = new 
HazelcastNode(member.getSocketAddress().getHostString(), 
member.getSocketAddress().getPort());
                     nodes.add(node);
                 }
             }
@@ -110,7 +110,7 @@ public class HazelcastClusterManager extends 
HazelcastInstanceAware implements C
                 Set<Member> members = cluster.getMembers();
                 if (members != null && !members.isEmpty()) {
                     for (Member member : members) {
-                        HazelcastNode node = new 
HazelcastNode(member.getInetSocketAddress().getHostName(), 
member.getInetSocketAddress().getPort());
+                        HazelcastNode node = new 
HazelcastNode(member.getSocketAddress().getHostString(), 
member.getSocketAddress().getPort());
                         if (ids.contains(node.getId())) {
                             nodes.add(node);
                         }
@@ -135,7 +135,7 @@ public class HazelcastClusterManager extends 
HazelcastInstanceAware implements C
                 Set<Member> members = cluster.getMembers();
                 if (members != null && !members.isEmpty()) {
                     for (Member member : members) {
-                        HazelcastNode node = new 
HazelcastNode(member.getInetSocketAddress().getHostName(), 
member.getInetSocketAddress().getPort());
+                        HazelcastNode node = new 
HazelcastNode(member.getSocketAddress().getHostString(), 
member.getSocketAddress().getPort());
                         if (id.equals(node.getId())) {
                             return node;
                         }

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
----------------------------------------------------------------------
diff --git 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
index 57a5d71..78370dd 100644
--- 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
+++ 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastGroupManager.java
@@ -140,7 +140,7 @@ public class HazelcastGroupManager implements GroupManager, 
EntryListener, Confi
             Cluster cluster = instance.getCluster();
             if (cluster != null) {
                 Member member = cluster.getLocalMember();
-                node = new 
HazelcastNode(member.getInetSocketAddress().getHostName(), 
member.getInetSocketAddress().getPort());
+                node = new 
HazelcastNode(member.getSocketAddress().getHostString(), 
member.getSocketAddress().getPort());
             }
             return node;
         } finally {

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
----------------------------------------------------------------------
diff --git 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
index 87e1288..7c9f14d 100644
--- 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
+++ 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/HazelcastInstanceAware.java
@@ -42,7 +42,7 @@ public class HazelcastInstanceAware {
         Cluster cluster = instance.getCluster();
         if (cluster != null) {
             Member member = cluster.getLocalMember();
-            return new 
HazelcastNode(member.getInetSocketAddress().getHostName(), 
member.getInetSocketAddress().getPort());
+            return new 
HazelcastNode(member.getSocketAddress().getHostString(), 
member.getSocketAddress().getPort());
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/karaf-cellar/blob/f2ffeb5f/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
----------------------------------------------------------------------
diff --git 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
index 85c7534..d19461d 100644
--- 
a/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
+++ 
b/hazelcast/src/main/java/org/apache/karaf/cellar/hazelcast/QueueConsumer.java
@@ -86,26 +86,32 @@ public class QueueConsumer<E extends Event> implements 
EventConsumer<E>, ItemLis
     @Override
     public void run() {
         ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
-        try {
-            while (isConsuming) {
-                if (combinedClassLoader != null) {
-                    
Thread.currentThread().setContextClassLoader(combinedClassLoader);
-                } else 
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-                E e = null;
-                try {
-                    e = getQueue().poll(10, TimeUnit.SECONDS);
-                } catch (InterruptedException e1) {
-                    LOGGER.warn("CELLAR HAZELCAST: consume task interrupted");
-                }
+        E e;
+        while (isConsuming) {
+            if (combinedClassLoader != null) {
+                
Thread.currentThread().setContextClassLoader(combinedClassLoader);
+            } else 
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            e = null;
+            try {
+                e = getQueue().poll(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e1) {
+                LOGGER.warn("CELLAR HAZELCAST: consume task interrupted");
+            } catch (Exception e2) {
+                // catch everything from Hazelcast to prevent the death of 
Queue Consumer task
+                LOGGER.warn("CELLAR HAZELCAST: consumer task failed to poll 
the queue", e2);
+            }
+            
+            try {
                 if (e != null) {
                     consume(e);
                 }
+            } catch (Exception e1) {
+                LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", 
e1);
             }
-        } catch (Exception ex) {
-            LOGGER.error("CELLAR HAZELCAST: failed to consume from queue", ex);
-        } finally {
-            Thread.currentThread().setContextClassLoader(originalClassLoader);
+
         }
+        
+        Thread.currentThread().setContextClassLoader(originalClassLoader);
     }
 
     /**

Reply via email to