Repository: oozie
Updated Branches:
  refs/heads/master 0439fbb05 -> c25818eae


OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie 
servers (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c25818ea
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c25818ea
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c25818ea

Branch: refs/heads/master
Commit: c25818eaee20060f384481454f35d650f12c67b5
Parents: 0439fbb
Author: Robert Kanter <[email protected]>
Authored: Tue Jul 8 14:43:43 2014 -0700
Committer: Robert Kanter <[email protected]>
Committed: Tue Jul 8 14:43:43 2014 -0700

----------------------------------------------------------------------
 .../oozie/service/JobsConcurrencyService.java   |  2 +-
 .../org/apache/oozie/service/PurgeService.java  |  4 +-
 .../apache/oozie/service/ShareLibService.java   |  2 +-
 .../oozie/service/ZKJobsConcurrencyService.java | 29 ++++++++---
 .../java/org/apache/oozie/util/ZKUtils.java     |  7 ++-
 .../service/TestJobsConcurrencyService.java     |  4 +-
 .../service/TestZKJobsConcurrencyService.java   | 54 ++++++++++++++++----
 .../java/org/apache/oozie/test/ZKXTestCase.java | 45 ++++++++++++++--
 release-log.txt                                 |  1 +
 9 files changed, 119 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java 
b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 69025fc..b44d9d7 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -80,7 +80,7 @@ public class JobsConcurrencyService implements Service, 
Instrumentable {
      *
      * @return true
      */
-    public boolean isFirstServer() {
+    public boolean isLeader() {
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/PurgeService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PurgeService.java 
b/core/src/main/java/org/apache/oozie/service/PurgeService.java
index 9eeee30..635e809 100644
--- a/core/src/main/java/org/apache/oozie/service/PurgeService.java
+++ b/core/src/main/java/org/apache/oozie/service/PurgeService.java
@@ -67,8 +67,8 @@ public class PurgeService implements Service {
         }
 
         public void run() {
-            // Only queue the purge command if this is the first server
-            if 
(Services.get().get(JobsConcurrencyService.class).isFirstServer()) {
+            // Only queue the purge command if this is the leader
+            if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
                 Services.get().get(CallableQueueService.class).queue(
                         new PurgeXCommand(wfOlderThan, coordOlderThan, 
bundleOlderThan, limit, purgeOldCoordAction));
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/ShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java 
b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
index c31d587..893b85b 100644
--- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java
+++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
@@ -133,7 +133,7 @@ public class ShareLibService implements Service, 
Instrumentable {
                 System.out.flush();
                 try {
                     //Only one server should purge sharelib
-                    if 
(Services.get().get(JobsConcurrencyService.class).isFirstServer()) {
+                    if 
(Services.get().get(JobsConcurrencyService.class).isLeader()) {
                         final Date current = 
Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
                         purgeLibs(fs, LAUNCHER_PREFIX, current);
                         purgeLibs(fs, SHARED_LIB_PREFIX, current);

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java 
b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 58580e5..270253f 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -24,9 +24,11 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.ZKUtils;
@@ -40,13 +42,21 @@ import org.apache.oozie.util.ZKUtils;
  * place additional stress on ZooKeeper and the Database.  By "assigning" 
different Oozie servers to process different jobs, we can
  * improve this situation.  This is particularly necessary for Services like 
the {@link RecoveryService}, which could duplicate jobs
  * otherwise.  We can assign jobs to servers by doing a mod of the jobs' id 
and the number of servers.
+ * <p>
+ * The leader server is elected by all of the Oozie servers, so there can only 
be one at a time.  This is useful for tasks that
+ * require (or are better off) being done by only one server (e.g. database 
purging).  Note that the leader server isn't a
+ * "traditional leader" in the sense that it doesn't command or have authority 
over the other servers.  This leader election uses
+ * a znode under 
/oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ZK_LEADER_PATH (default is 
/oozie/services/concurrencyleader).
  */
 public class ZKJobsConcurrencyService extends JobsConcurrencyService 
implements Service, Instrumentable {
 
     private ZKUtils zk;
 
     // This pattern gives us the id number without the extra stuff
-    private static Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*");
+    private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*");
+
+    private static final String ZK_LEADER_PATH = "concurrencyleader";
+    private static LeaderLatch leaderLatch = null;
 
     /**
      * Initialize the zookeeper jobs concurrency service
@@ -58,6 +68,8 @@ public class ZKJobsConcurrencyService extends 
JobsConcurrencyService implements
         super.init(services);
         try {
             zk = ZKUtils.register(this);
+            leaderLatch = new LeaderLatch(zk.getClient(), 
ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId());
+            leaderLatch.start();
         }
         catch (Exception ex) {
             throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
@@ -69,6 +81,9 @@ public class ZKJobsConcurrencyService extends 
JobsConcurrencyService implements
      */
     @Override
     public void destroy() {
+        if (leaderLatch != null) {
+            IOUtils.closeSafely(leaderLatch);
+        }
         if (zk != null) {
             zk.unregister(this);
         }
@@ -87,16 +102,14 @@ public class ZKJobsConcurrencyService extends 
JobsConcurrencyService implements
     }
 
     /**
-     * Check to see if this server is the first server.  This implementation 
only returns true if this server is the first server in
-     * ZooKeeper's list of Oozie servers (so only one Oozie Server will return 
true).
+     * Check to see if this server is the leader server.  This implementation 
only returns true if this server has been elected by
+     * all of the servers as the leader server.
      *
-     * @return true if this server is first; false if not
+     * @return true if this server is the leader; false if not
      */
     @Override
-    public boolean isFirstServer() {
-        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
-        int myIndex = zk.getZKIdIndex(oozies);
-        return (myIndex == 0);
+    public boolean isLeader() {
+        return leaderLatch.hasLeadership();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/util/ZKUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java 
b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index 885b656..74f6266 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -63,7 +63,7 @@ import org.apache.zookeeper.data.Stat;
  * to add additional metadata in the future, we share a Map.  They keys are 
defined in {@link ZKMetadataKeys}.
  * <p>
  * For the service discovery, the structure in ZooKeeper is 
/oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is
- * /oozie/services/).  There is currently only one service, named "servers" 
under which each Oozie server creates a ZNode named
+ * /oozie/services/).  ZKUtils has a service named "servers" under which each 
Oozie server creates a ZNode named
  * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the 
metadata payload.  For example, with the default settings,
  * an Oozie server named "foo" would create a ZNode at 
/oozie/services/servers/foo where the foo ZNode contains the metadata.
  * <p>
@@ -95,7 +95,10 @@ public class ZKUtils {
     public static final String ZK_SECURE = "oozie.zookeeper.secure";
 
     private static final String ZK_OOZIE_SERVICE = "servers";
-    private static final String ZK_BASE_SERVICES_PATH = "/services";
+    /**
+     * Services that need to put a node in zookeeper should go under here.  
Try to keep this area clean and organized.
+     */
+    public static final String ZK_BASE_SERVICES_PATH = "/services";
 
     private static Set<Object> users = new HashSet<Object>();
     private CuratorFramework client = null;

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java 
b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
index 0ba4332..53df650 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java
@@ -38,11 +38,11 @@ public class TestJobsConcurrencyService extends XTestCase {
         super.tearDown();
     }
 
-    public void testIsFirstServer() throws Exception {
+    public void testIsLeader() throws Exception {
         JobsConcurrencyService jcs = new JobsConcurrencyService();
         try {
             jcs.init(null);
-            assertTrue(jcs.isFirstServer());
+            assertTrue(jcs.isLeader());
         }
         finally {
             jcs.destroy();

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
index 644a76e..529e15c 100644
--- 
a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
+++ 
b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
@@ -54,20 +54,56 @@ public class TestZKJobsConcurrencyService extends 
ZKXTestCase {
         }
     }
 
-    public void testIsFirstServer() throws Exception {
+    public void testIsLeader() throws Exception {
         ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
-        // We'll use some DummyZKXOozies here to pretend to be other Oozie 
servers that will influence isFirstServer()
-        // once they are running in that it will only return true for the 
first Oozie "server"
+        // We'll use some DummyZKXOozies here to pretend to be other Oozie 
servers.  It chooses randomly so we can't check that a
+        // specific server gets chosen.
         DummyZKOozie dummyOozie = null;
         DummyZKOozie dummyOozie2 = null;
         try {
-            dummyOozie = new DummyZKOozie("a", "http://blah";);
             zkjcs.init(Services.get());
-            assertFalse(zkjcs.isFirstServer());
-            dummyOozie2 = new DummyZKOozie("b", "http://blah";);
-            assertFalse(zkjcs.isFirstServer());
-            dummyOozie.teardown();
-            assertTrue(zkjcs.isFirstServer());
+            dummyOozie = new DummyZKOozie("a", "http://blah";, true);
+            dummyOozie2 = new DummyZKOozie("b", "http://blah";, true);
+            sleep(3 * 1000);
+            if (zkjcs.isLeader()) {
+                assertFalse(dummyOozie.isLeader());
+                assertFalse(dummyOozie2.isLeader());
+                zkjcs.destroy();
+                sleep(3 * 1000);
+                if (dummyOozie.isLeader()) {
+                    assertFalse(dummyOozie2.isLeader());
+                } else if (dummyOozie2.isLeader()) {
+                    assertFalse(dummyOozie.isLeader());
+                } else {
+                    fail("No leader");
+                }
+            } else if (dummyOozie.isLeader()) {
+                assertFalse(zkjcs.isLeader());
+                assertFalse(dummyOozie2.isLeader());
+                dummyOozie.teardown();
+                sleep(3 * 1000);
+                if (zkjcs.isLeader()) {
+                    assertFalse(dummyOozie2.isLeader());
+                } else if (dummyOozie2.isLeader()) {
+                    assertFalse(zkjcs.isLeader());
+                } else {
+                    fail("No leader");
+                }
+            } else if (dummyOozie2.isLeader()) {
+                assertFalse(zkjcs.isLeader());
+                assertFalse(dummyOozie.isLeader());
+                dummyOozie2.teardown();
+                sleep(3 * 1000);
+                if (zkjcs.isLeader()) {
+                    assertFalse(dummyOozie.isLeader());
+                } else if (dummyOozie.isLeader()) {
+                    assertFalse(zkjcs.isLeader());
+                } else {
+                    fail("No leader");
+                }
+            } else {
+                fail("No leader");
+            }
         }
         finally {
             zkjcs.destroy();

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java 
b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 946a4b9..6a2dd32 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.test;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -25,6 +26,7 @@ import java.util.Map.Entry;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.EnsurePath;
@@ -165,26 +167,43 @@ public abstract class ZKXTestCase extends XDataTestCase {
     /**
      * Provides a class that can pretend to be another Oozie Server as far as 
ZooKeeper and anything using ZKUtils is concerned.
      * You can specify the ID and URL of the Oozie Server.  It will "start" 
when the constructor is called and can be "stopped"
-     * by calling {@link DummyZKOozie#teardown() }.  Make sure to tear down 
any DummyZKOozies that you create.
+     * by calling {@link DummyZKOozie#teardown()}.  It can also optionally 
join the ZKJobsConcurrencyService leader election.
+     * Make sure to tear down any DummyZKOozies that you create.
      */
     protected class DummyZKOozie {
         private CuratorFramework client = null;
         private String zkId;
         private ServiceDiscovery<Map> sDiscovery;
         private String metadataUrl;
+        private LeaderLatch leaderLatch = null;
 
         /**
-         * Creates a DummyZKOozie.
+         * Creates a DummyZKOozie.  Will not join the ZKJobsConcurrencyService 
leader election.
          *
          * @param zkId The ID of this new Oozie "server"
          * @param metadataUrl The URL to advertise for this "server"
          * @throws Exception
          */
         public DummyZKOozie(String zkId, String metadataUrl) throws Exception {
+            this(zkId, metadataUrl, false);
+        }
+
+        /**
+         * Creates a DummyZKOozie.
+         *
+         * @param zkId The ID of this new Oozie "server"
+         * @param metadataUrl The URL to advertise for this "server"
+         * @param joinConcurrencyLeaderElection true if should join 
ZKJobsConcurrencyService leader election; false if not
+         * @throws Exception
+         */
+        public DummyZKOozie(String zkId, String metadataUrl, boolean 
joinConcurrencyLeaderElection) throws Exception {
             this.zkId = zkId;
             this.metadataUrl = metadataUrl;
             createClient();
             advertiseService();
+            if (joinConcurrencyLeaderElection) {
+                joinConcurrencyLeaderElection();
+            }
         }
 
         private void createClient() throws Exception {
@@ -221,11 +240,29 @@ public abstract class ZKXTestCase extends XDataTestCase {
             sleep(1000);    // Sleep to allow ZKUtils ServiceCache to update
         }
 
+        private void joinConcurrencyLeaderElection() throws Exception {
+            leaderLatch = new LeaderLatch(client, 
"/services/concurrencyleader", zkId);
+            leaderLatch.start();
+        }
+
+        public boolean isLeader() {
+            if (leaderLatch != null) {
+                return leaderLatch.hasLeadership();
+            }
+            throw new RuntimeException("Must join concurrency leader 
election");
+        }
+
         public void teardown() {
+            if (leaderLatch != null) {
+                try {
+                    leaderLatch.close();
+                } catch (IOException ioe) {
+                    log.warn("Exception occured while leaving leader latch", 
ioe);
+                }
+            }
             try {
                 unadvertiseService();
-            }
-            catch (Exception ex) {
+            } catch (Exception ex) {
                 log.warn("Exception occurred while unadvertising: " + 
ex.getMessage(), ex);
             }
             client.close();

http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5ac93f2..c5a9fd8 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie 
servers (rkanter)
 OOZIE-1886 Queue operation talking longer time (shwethags via rohini)
 OOZIE-1865 Oozie servers can't talk to each other with Oozie HA and Kerberos 
(rkanter)
 OOZIE-1821 Oozie java action fails due to AlreadyBeingCreatedException 
(abhishek.agarwal via rkanter)

Reply via email to