Repository: oozie Updated Branches: refs/heads/master 0439fbb05 -> c25818eae
OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie servers (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c25818ea Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c25818ea Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c25818ea Branch: refs/heads/master Commit: c25818eaee20060f384481454f35d650f12c67b5 Parents: 0439fbb Author: Robert Kanter <[email protected]> Authored: Tue Jul 8 14:43:43 2014 -0700 Committer: Robert Kanter <[email protected]> Committed: Tue Jul 8 14:43:43 2014 -0700 ---------------------------------------------------------------------- .../oozie/service/JobsConcurrencyService.java | 2 +- .../org/apache/oozie/service/PurgeService.java | 4 +- .../apache/oozie/service/ShareLibService.java | 2 +- .../oozie/service/ZKJobsConcurrencyService.java | 29 ++++++++--- .../java/org/apache/oozie/util/ZKUtils.java | 7 ++- .../service/TestJobsConcurrencyService.java | 4 +- .../service/TestZKJobsConcurrencyService.java | 54 ++++++++++++++++---- .../java/org/apache/oozie/test/ZKXTestCase.java | 45 ++++++++++++++-- release-log.txt | 1 + 9 files changed, 119 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java index 69025fc..b44d9d7 100644 --- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java @@ -80,7 +80,7 @@ public class JobsConcurrencyService implements Service, Instrumentable { * * @return true */ - public boolean isFirstServer() { + public boolean isLeader() { return true; } http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/PurgeService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/PurgeService.java b/core/src/main/java/org/apache/oozie/service/PurgeService.java index 9eeee30..635e809 100644 --- a/core/src/main/java/org/apache/oozie/service/PurgeService.java +++ b/core/src/main/java/org/apache/oozie/service/PurgeService.java @@ -67,8 +67,8 @@ public class PurgeService implements Service { } public void run() { - // Only queue the purge command if this is the first server - if (Services.get().get(JobsConcurrencyService.class).isFirstServer()) { + // Only queue the purge command if this is the leader + if (Services.get().get(JobsConcurrencyService.class).isLeader()) { Services.get().get(CallableQueueService.class).queue( new PurgeXCommand(wfOlderThan, coordOlderThan, bundleOlderThan, limit, purgeOldCoordAction)); } http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/ShareLibService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java index c31d587..893b85b 100644 --- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java +++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java @@ -133,7 +133,7 @@ public class ShareLibService implements Service, Instrumentable { System.out.flush(); try { //Only one server should purge sharelib - if (Services.get().get(JobsConcurrencyService.class).isFirstServer()) { + if (Services.get().get(JobsConcurrencyService.class).isLeader()) { final Date current = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime(); purgeLibs(fs, LAUNCHER_PREFIX, current); purgeLibs(fs, SHARED_LIB_PREFIX, current); http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java index 58580e5..270253f 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java @@ -24,9 +24,11 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; import org.apache.oozie.util.ZKUtils; @@ -40,13 +42,21 @@ import org.apache.oozie.util.ZKUtils; * place additional stress on ZooKeeper and the Database. By "assigning" different Oozie servers to process different jobs, we can * improve this situation. This is particularly necessary for Services like the {@link RecoveryService}, which could duplicate jobs * otherwise. We can assign jobs to servers by doing a mod of the jobs' id and the number of servers. + * <p> + * The leader server is elected by all of the Oozie servers, so there can only be one at a time. This is useful for tasks that + * require (or are better off) being done by only one server (e.g. database purging). Note that the leader server isn't a + * "traditional leader" in the sense that it doesn't command or have authority over the other servers. This leader election uses + * a znode under /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ZK_LEADER_PATH (default is /oozie/services/concurrencyleader). */ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable { private ZKUtils zk; // This pattern gives us the id number without the extra stuff - private static Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*"); + private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*"); + + private static final String ZK_LEADER_PATH = "concurrencyleader"; + private static LeaderLatch leaderLatch = null; /** * Initialize the zookeeper jobs concurrency service @@ -58,6 +68,8 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements super.init(services); try { zk = ZKUtils.register(this); + leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId()); + leaderLatch.start(); } catch (Exception ex) { throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); @@ -69,6 +81,9 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements */ @Override public void destroy() { + if (leaderLatch != null) { + IOUtils.closeSafely(leaderLatch); + } if (zk != null) { zk.unregister(this); } @@ -87,16 +102,14 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements } /** - * Check to see if this server is the first server. This implementation only returns true if this server is the first server in - * ZooKeeper's list of Oozie servers (so only one Oozie Server will return true). + * Check to see if this server is the leader server. This implementation only returns true if this server has been elected by + * all of the servers as the leader server. * - * @return true if this server is first; false if not + * @return true if this server is the leader; false if not */ @Override - public boolean isFirstServer() { - List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); - int myIndex = zk.getZKIdIndex(oozies); - return (myIndex == 0); + public boolean isLeader() { + return leaderLatch.hasLeadership(); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/main/java/org/apache/oozie/util/ZKUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java index 885b656..74f6266 100644 --- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java +++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java @@ -63,7 +63,7 @@ import org.apache.zookeeper.data.Stat; * to add additional metadata in the future, we share a Map. They keys are defined in {@link ZKMetadataKeys}. * <p> * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is - * /oozie/services/). There is currently only one service, named "servers" under which each Oozie server creates a ZNode named + * /oozie/services/). ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload. For example, with the default settings, * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata. * <p> @@ -95,7 +95,10 @@ public class ZKUtils { public static final String ZK_SECURE = "oozie.zookeeper.secure"; private static final String ZK_OOZIE_SERVICE = "servers"; - private static final String ZK_BASE_SERVICES_PATH = "/services"; + /** + * Services that need to put a node in zookeeper should go under here. Try to keep this area clean and organized. + */ + public static final String ZK_BASE_SERVICES_PATH = "/services"; private static Set<Object> users = new HashSet<Object>(); private CuratorFramework client = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java index 0ba4332..53df650 100644 --- a/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java +++ b/core/src/test/java/org/apache/oozie/service/TestJobsConcurrencyService.java @@ -38,11 +38,11 @@ public class TestJobsConcurrencyService extends XTestCase { super.tearDown(); } - public void testIsFirstServer() throws Exception { + public void testIsLeader() throws Exception { JobsConcurrencyService jcs = new JobsConcurrencyService(); try { jcs.init(null); - assertTrue(jcs.isFirstServer()); + assertTrue(jcs.isLeader()); } finally { jcs.destroy(); http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java index 644a76e..529e15c 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java @@ -54,20 +54,56 @@ public class TestZKJobsConcurrencyService extends ZKXTestCase { } } - public void testIsFirstServer() throws Exception { + public void testIsLeader() throws Exception { ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService(); - // We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence isFirstServer() - // once they are running in that it will only return true for the first Oozie "server" + // We'll use some DummyZKXOozies here to pretend to be other Oozie servers. It chooses randomly so we can't check that a + // specific server gets chosen. DummyZKOozie dummyOozie = null; DummyZKOozie dummyOozie2 = null; try { - dummyOozie = new DummyZKOozie("a", "http://blah"); zkjcs.init(Services.get()); - assertFalse(zkjcs.isFirstServer()); - dummyOozie2 = new DummyZKOozie("b", "http://blah"); - assertFalse(zkjcs.isFirstServer()); - dummyOozie.teardown(); - assertTrue(zkjcs.isFirstServer()); + dummyOozie = new DummyZKOozie("a", "http://blah", true); + dummyOozie2 = new DummyZKOozie("b", "http://blah", true); + sleep(3 * 1000); + if (zkjcs.isLeader()) { + assertFalse(dummyOozie.isLeader()); + assertFalse(dummyOozie2.isLeader()); + zkjcs.destroy(); + sleep(3 * 1000); + if (dummyOozie.isLeader()) { + assertFalse(dummyOozie2.isLeader()); + } else if (dummyOozie2.isLeader()) { + assertFalse(dummyOozie.isLeader()); + } else { + fail("No leader"); + } + } else if (dummyOozie.isLeader()) { + assertFalse(zkjcs.isLeader()); + assertFalse(dummyOozie2.isLeader()); + dummyOozie.teardown(); + sleep(3 * 1000); + if (zkjcs.isLeader()) { + assertFalse(dummyOozie2.isLeader()); + } else if (dummyOozie2.isLeader()) { + assertFalse(zkjcs.isLeader()); + } else { + fail("No leader"); + } + } else if (dummyOozie2.isLeader()) { + assertFalse(zkjcs.isLeader()); + assertFalse(dummyOozie.isLeader()); + dummyOozie2.teardown(); + sleep(3 * 1000); + if (zkjcs.isLeader()) { + assertFalse(dummyOozie.isLeader()); + } else if (dummyOozie.isLeader()) { + assertFalse(zkjcs.isLeader()); + } else { + fail("No leader"); + } + } else { + fail("No leader"); + } } finally { zkjcs.destroy(); http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java index 946a4b9..6a2dd32 100644 --- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java @@ -17,6 +17,7 @@ */ package org.apache.oozie.test; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -25,6 +26,7 @@ import java.util.Map.Entry; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.EnsurePath; @@ -165,26 +167,43 @@ public abstract class ZKXTestCase extends XDataTestCase { /** * Provides a class that can pretend to be another Oozie Server as far as ZooKeeper and anything using ZKUtils is concerned. * You can specify the ID and URL of the Oozie Server. It will "start" when the constructor is called and can be "stopped" - * by calling {@link DummyZKOozie#teardown() }. Make sure to tear down any DummyZKOozies that you create. + * by calling {@link DummyZKOozie#teardown()}. It can also optionally join the ZKJobsConcurrencyService leader election. + * Make sure to tear down any DummyZKOozies that you create. */ protected class DummyZKOozie { private CuratorFramework client = null; private String zkId; private ServiceDiscovery<Map> sDiscovery; private String metadataUrl; + private LeaderLatch leaderLatch = null; /** - * Creates a DummyZKOozie. + * Creates a DummyZKOozie. Will not join the ZKJobsConcurrencyService leader election. * * @param zkId The ID of this new Oozie "server" * @param metadataUrl The URL to advertise for this "server" * @throws Exception */ public DummyZKOozie(String zkId, String metadataUrl) throws Exception { + this(zkId, metadataUrl, false); + } + + /** + * Creates a DummyZKOozie. + * + * @param zkId The ID of this new Oozie "server" + * @param metadataUrl The URL to advertise for this "server" + * @param joinConcurrencyLeaderElection true if should join ZKJobsConcurrencyService leader election; false if not + * @throws Exception + */ + public DummyZKOozie(String zkId, String metadataUrl, boolean joinConcurrencyLeaderElection) throws Exception { this.zkId = zkId; this.metadataUrl = metadataUrl; createClient(); advertiseService(); + if (joinConcurrencyLeaderElection) { + joinConcurrencyLeaderElection(); + } } private void createClient() throws Exception { @@ -221,11 +240,29 @@ public abstract class ZKXTestCase extends XDataTestCase { sleep(1000); // Sleep to allow ZKUtils ServiceCache to update } + private void joinConcurrencyLeaderElection() throws Exception { + leaderLatch = new LeaderLatch(client, "/services/concurrencyleader", zkId); + leaderLatch.start(); + } + + public boolean isLeader() { + if (leaderLatch != null) { + return leaderLatch.hasLeadership(); + } + throw new RuntimeException("Must join concurrency leader election"); + } + public void teardown() { + if (leaderLatch != null) { + try { + leaderLatch.close(); + } catch (IOException ioe) { + log.warn("Exception occured while leaving leader latch", ioe); + } + } try { unadvertiseService(); - } - catch (Exception ex) { + } catch (Exception ex) { log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex); } client.close(); http://git-wip-us.apache.org/repos/asf/oozie/blob/c25818ea/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 5ac93f2..c5a9fd8 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie servers (rkanter) OOZIE-1886 Queue operation talking longer time (shwethags via rohini) OOZIE-1865 Oozie servers can't talk to each other with Oozie HA and Kerberos (rkanter) OOZIE-1821 Oozie java action fails due to AlreadyBeingCreatedException (abhishek.agarwal via rkanter)
