Author: stefanegli
Date: Wed Nov 20 17:02:24 2013
New Revision: 1543877

URL: http://svn.apache.org/r1543877
Log:
SLING-3253 : fixed leader election: actually using calculated leaderId 
resulting from leader election, instead of doing the leader election again 
based on only the slingId

Modified:
    
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java
    
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
    
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java

Modified: 
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java?rev=1543877&r1=1543876&r2=1543877&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java
 (original)
+++ 
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java
 Wed Nov 20 17:02:24 2013
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.impl.Config;
 import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl;
@@ -51,6 +52,7 @@ public class EstablishedClusterView exte
             final String localId) {
         super(view.getViewId());
 
+        String leaderId = 
view.getResource().adaptTo(ValueMap.class).get("leaderId", String.class);
         final Iterator<Resource> it1 = view.getResource().getChild("members")
                 .getChildren().iterator();
         final List<Resource> instanceRess = new LinkedList<Resource>();
@@ -73,8 +75,11 @@ public class EstablishedClusterView exte
             }
         });
 
-        final Resource leader = instanceRess.get(0);
-        final String leaderId = leader.getName();
+        if (leaderId==null || leaderId.length()==0) {
+               // fallback to pre-SLING-3253: choose leader based on slingId 
alone.
+               final Resource leader = instanceRess.get(0);
+               leaderId = leader.getName();
+        }
         InstanceDescription leaderInstance = null;
 
         for (Iterator<Resource> it2 = instanceRess.iterator(); it2.hasNext();) 
{

Modified: 
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java?rev=1543877&r1=1543876&r2=1543877&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
 Wed Nov 20 17:02:24 2013
@@ -19,6 +19,8 @@
 package org.apache.sling.discovery.impl.cluster;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -73,6 +75,57 @@ public class ClusterTest {
         instance2 = null;
         instance3 = null;
     }
+    
+    /** test leader behaviour with ascending slingIds, SLING-3253 **/
+    @Test
+    public void testLeaderAsc() throws Throwable {
+       doTestLeader("000", "111");
+    }
+
+    /** test leader behaviour with descending slingIds, SLING-3253 **/
+    @Test
+    public void testLeaderDesc() throws Throwable {
+       doTestLeader("111", "000");
+    }
+
+    private void doTestLeader(String slingId1, String slingId2) throws 
Throwable {
+       // stop 1 and 2 and create them with a lower heartbeat timeout
+       instance2.stopHeartbeats();
+       instance1.stopHeartbeats();
+        instance2.stop();
+        instance1.stop();
+        instance1 = Instance.newStandaloneInstance("/var/discovery/impl/", 
"firstInstance", true, 1, 1, slingId1);
+        // sleep so that the two dont have the same startup time, and thus 
leaderElectionId is lower for instance1
+        Thread.sleep(200);
+        instance2 = Instance.newClusterInstance("/var/discovery/impl/", 
"secondInstance", instance1,
+                false, 1, 1, slingId2);
+        assertNotNull(instance1);
+        assertNotNull(instance2);
+
+        // the two instances are still isolated - so in a cluster of size 1
+        assertEquals(1, 
instance1.getClusterViewService().getClusterView().getInstances().size());
+        assertEquals(1, 
instance2.getClusterViewService().getClusterView().getInstances().size());
+        assertTrue(instance1.getLocalInstanceDescription().isLeader());
+        assertTrue(instance2.getLocalInstanceDescription().isLeader());
+
+        // let the sync/voting happen
+        instance1.runHeartbeatOnce();
+        instance2.runHeartbeatOnce();
+        Thread.sleep(500);
+        instance1.runHeartbeatOnce();
+        instance2.runHeartbeatOnce();
+        Thread.sleep(500);
+        instance1.runHeartbeatOnce();
+        instance2.runHeartbeatOnce();
+        
+        // now they must be in the same cluster, so in a cluster of size 1
+        assertEquals(2, 
instance1.getClusterViewService().getClusterView().getInstances().size());
+        assertEquals(2, 
instance2.getClusterViewService().getClusterView().getInstances().size());
+        
+        // the first instance should be the leader - since it was started first
+        assertTrue(instance1.getLocalInstanceDescription().isLeader());
+        assertFalse(instance2.getLocalInstanceDescription().isLeader());
+    }
 
     @Test
     public void testStableClusterId() throws Throwable {

Modified: 
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java?rev=1543877&r1=1543876&r2=1543877&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java
 (original)
+++ 
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java
 Wed Nov 20 17:02:24 2013
@@ -18,9 +18,12 @@
  */
 package org.apache.sling.discovery.impl.setup;
 
+import static org.junit.Assert.fail;
+
 import java.lang.reflect.InvocationTargetException;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -47,6 +50,7 @@ import org.apache.sling.commons.schedule
 import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
+import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.PropertyProvider;
 import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.discovery.impl.Config;
@@ -65,7 +69,7 @@ public class Instance {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    public final String slingId = UUID.randomUUID().toString();
+    public final String slingId;
 
     ClusterViewServiceImpl clusterViewService;
 
@@ -150,13 +154,14 @@ public class Instance {
     private Instance(String debugName,
             ResourceResolverFactory resourceResolverFactory, boolean resetRepo)
             throws Exception {
-       this("/var/discovery/impl/", debugName, resourceResolverFactory, 
resetRepo, 20, 1);
+       this("/var/discovery/impl/", debugName, resourceResolverFactory, 
resetRepo, 20, 1, UUID.randomUUID().toString());
     }
     
     private Instance(String discoveryResourcePath, String debugName,
             ResourceResolverFactory resourceResolverFactory, boolean resetRepo,
-            final int heartbeatTimeout, final int minEventDelay)
+            final int heartbeatTimeout, final int minEventDelay, String 
slingId)
             throws Exception {
+       this.slingId = slingId;
         this.debugName = debugName;
 
         osgiMock = new OSGiMock();
@@ -263,10 +268,17 @@ public class Instance {
     }
 
     public static Instance newStandaloneInstance(String discoveryResourcePath, 
String debugName,
+            boolean resetRepo, int heartbeatTimeout, int minEventDelay, String 
slingId) throws Exception {
+        ResourceResolverFactory resourceResolverFactory = MockFactory
+                .mockResourceResolverFactory();
+        return new Instance(discoveryResourcePath, debugName, 
resourceResolverFactory, resetRepo, heartbeatTimeout, minEventDelay, slingId);
+    }
+    
+    public static Instance newStandaloneInstance(String discoveryResourcePath, 
String debugName,
             boolean resetRepo, int heartbeatTimeout, int minEventDelay) throws 
Exception {
         ResourceResolverFactory resourceResolverFactory = MockFactory
                 .mockResourceResolverFactory();
-        return new Instance(discoveryResourcePath, debugName, 
resourceResolverFactory, resetRepo, heartbeatTimeout, minEventDelay);
+        return new Instance(discoveryResourcePath, debugName, 
resourceResolverFactory, resetRepo, heartbeatTimeout, minEventDelay, 
UUID.randomUUID().toString());
     }
     
     public static Instance newStandaloneInstance(String debugName,
@@ -277,8 +289,13 @@ public class Instance {
     }
 
     public static Instance newClusterInstance(String discoveryResourcePath, 
String debugName, Instance other,
+            boolean resetRepo, int heartbeatTimeout, int minEventDelay, String 
slingId) throws Exception {
+        return new Instance(discoveryResourcePath, debugName, 
other.resourceResolverFactory, resetRepo, heartbeatTimeout, minEventDelay, 
slingId);
+    }
+
+    public static Instance newClusterInstance(String discoveryResourcePath, 
String debugName, Instance other,
             boolean resetRepo, int heartbeatTimeout, int minEventDelay) throws 
Exception {
-        return new Instance(discoveryResourcePath, debugName, 
other.resourceResolverFactory, resetRepo, heartbeatTimeout, minEventDelay);
+        return new Instance(discoveryResourcePath, debugName, 
other.resourceResolverFactory, resetRepo, heartbeatTimeout, minEventDelay, 
UUID.randomUUID().toString());
     }
 
     public static Instance newClusterInstance(String debugName, Instance other,
@@ -304,6 +321,19 @@ public class Instance {
     public ClusterViewService getClusterViewService() {
         return clusterViewService;
     }
+    
+    public InstanceDescription getLocalInstanceDescription() {
+       final Iterator<InstanceDescription> it = 
getClusterViewService().getClusterView().getInstances().iterator();
+       while(it.hasNext()) {
+               final InstanceDescription id = it.next();
+               if (slingId.equals(id.getSlingId())) {
+                       return id;
+               }
+       }
+       fail("no local instanceDescription found");
+       // never called:
+       return null;
+    }
 
     public void runHeartbeatOnce() {
        logger.info("Instance ["+slingId+"] issues a heartbeat now "+new 
Date());
@@ -318,7 +348,14 @@ public class Instance {
                logger.info("startHeartbeats: stopped.");
        }
                logger.info("startHeartbeats: activating...");
-       OSGiMock.activate(heartbeatHandler);
+       try{
+               OSGiMock.activate(heartbeatHandler);
+       } catch(Error er) {
+               er.printStackTrace(System.out);
+               throw er;
+       } catch(RuntimeException re) {
+               re.printStackTrace(System.out);
+       }
                logger.info("startHeartbeats: initializing...");
        heartbeatRunner = new HeartbeatRunner(intervalInSeconds);
        Thread th = new Thread(heartbeatRunner, "Test-Heartbeat-Runner");


Reply via email to