Repository: oozie Updated Branches: refs/heads/master b87686b74 -> 9f150b0b1
Amendment to OOZIE-1847 Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9f150b0b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9f150b0b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9f150b0b Branch: refs/heads/master Commit: 9f150b0b150c813c7a973046cf2b864ee5e1d27c Parents: b87686b Author: egashira <[email protected]> Authored: Tue Oct 7 14:07:36 2014 -0700 Committer: egashira <[email protected]> Committed: Tue Oct 7 14:07:36 2014 -0700 ---------------------------------------------------------------------- .../event/listener/ZKConnectionListener.java | 25 +++++++++++++------- .../oozie/service/ZKJobsConcurrencyService.java | 5 ++-- .../apache/oozie/service/ZKLocksService.java | 10 +++++--- .../java/org/apache/oozie/util/ZKUtils.java | 16 ++++++++++--- core/src/main/resources/oozie-default.xml | 11 ++++++++- .../java/org/apache/oozie/test/ZKXTestCase.java | 3 ++- 6 files changed, 52 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java index 1406b6a..a5d22c0 100644 --- a/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java +++ b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java @@ -31,6 +31,8 @@ import org.apache.oozie.util.ZKUtils; public class ZKConnectionListener implements ConnectionStateListener { private XLog LOG = XLog.getLog(getClass()); + private static ConnectionState connectionState; + public static final String CONF_SHUTDOWN_ON_TIMEOUT = "oozie.zookeeper.server.shutdown.ontimeout"; public ZKConnectionListener() { LOG.info("ZKConnectionListener started"); @@ -38,13 +40,14 @@ public class ZKConnectionListener implements ConnectionStateListener { @Override public void stateChanged(final CuratorFramework client, final ConnectionState newState) { + connectionState = newState; LOG.trace("ZK connection status = " + newState.toString()); -// if (newState == ConnectionState.CONNECTED) { -// ZK connected -// } + // if (newState == ConnectionState.CONNECTED) { + // ZK connected + // } if (newState == ConnectionState.SUSPENDED) { LOG.warn("ZK connection is suspended, waiting for reconnect. If connection doesn't reconnect before " - + ZKUtils.getZKConnectionTimeout() + " Oozie server will shutdown itself"); + + ZKUtils.getZKConnectionTimeout() + " (sec) Oozie server will shutdown itself"); } if (newState == ConnectionState.RECONNECTED) { @@ -53,10 +56,16 @@ public class ZKConnectionListener implements ConnectionStateListener { } if (newState == ConnectionState.LOST) { - LOG.fatal("ZK is connection is not reconnected in " + ZKUtils.getZKConnectionTimeout() - + ", shutting down Oozie server"); - Services.get().destroy(); - System.exit(1); + LOG.fatal("ZK is not reconnected in " + ZKUtils.getZKConnectionTimeout()); + if (Services.get().getConf().getBoolean(CONF_SHUTDOWN_ON_TIMEOUT, true)) { + LOG.fatal("Shutting down Oozie server"); + Services.get().destroy(); + System.exit(1); + } } } + + public static ConnectionState getZKConnectionState() { + return connectionState; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java index 96fb373..a64f613 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.service; import java.util.ArrayList; @@ -26,9 +25,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.event.listener.ZKConnectionListener; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; @@ -82,7 +83,7 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements */ @Override public void destroy() { - if (leaderLatch != null) { + if (leaderLatch != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) { IOUtils.closeSafely(leaderLatch); } if (zk != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java index 36c00b9..6f333c8 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java @@ -15,24 +15,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.service; import java.util.HashMap; import java.util.concurrent.TimeUnit; + import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.Instrumentable; import org.apache.oozie.util.Instrumentation; +import org.apache.oozie.event.listener.ZKConnectionListener; import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XLog; import org.apache.oozie.util.ZKUtils; + import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; + import org.apache.curator.framework.recipes.locks.ChildReaper; import org.apache.curator.framework.recipes.locks.Reaper; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.ThreadUtils; + import com.google.common.annotations.VisibleForTesting; /** @@ -78,7 +83,7 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr */ @Override public void destroy() { - if (reaper != null) { + if (reaper != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) { try { reaper.close(); } @@ -86,7 +91,6 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr LOG.error("Error closing childReaper", e); } } - if (zk != null) { zk.unregister(this); } http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/util/ZKUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java index ec32092..ce38bf6 100644 --- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java +++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.util; import com.google.common.annotations.VisibleForTesting; @@ -35,6 +34,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.EnsurePath; import org.apache.curator.x.discovery.ServiceCache; @@ -163,7 +163,9 @@ public class ZKUtils { // If there are no more classes using ZooKeeper, we should teardown everything. users.remove(user); if (users.isEmpty() && zk != null) { - zk.teardown(); + if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) { + zk.teardown(); + } zk = null; } } @@ -172,7 +174,7 @@ public class ZKUtils { // Connect to the ZooKeeper server RetryPolicy retryPolicy = ZKUtils.getRetryPloicy(); String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181"); - String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie"); + String zkNamespace = getZKNameSpace(); zkConnectionTimeout = Services.get().getConf().getInt(ZK_CONNECTION_TIMEOUT, 180); ACLProvider aclProvider; @@ -405,6 +407,14 @@ public class ZKUtils { public static RetryPolicy getRetryPloicy() { return new ExponentialBackoffRetry(1000, 3); } + + /** + * Returns configured zk namesapces + * @return oozie.zookeeper.namespace + */ + public static String getZKNameSpace() { + return Services.get().getConf().get(ZK_NAMESPACE, "oozie"); + } /** * Return ZK connection timeout * @return http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 06693bb..26eb7e0 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2069,7 +2069,16 @@ <value>180</value> <description> Default ZK connection timeout (in sec). If connection is lost for more than timeout, then Oozie server will shutdown - itself. + itself if oozie.zookeeper.server.shutdown.ontimeout is true. + </description> + </property> + + <property> + <name>oozie.zookeeper.server.shutdown.ontimeout</name> + <value>true</value> + <description> + If true, Oozie server will shutdown itself on ZK + connection timeout. </description> </property> http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java index 5697199..84d6e5d 100644 --- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.oozie.test; import java.io.IOException; @@ -35,6 +34,7 @@ import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.InstanceSerializer; +import org.apache.oozie.event.listener.ZKConnectionListener; import org.apache.oozie.service.Services; import org.apache.oozie.util.FixedJsonInstanceSerializer; import org.apache.oozie.util.ZKUtils; @@ -89,6 +89,7 @@ public abstract class ZKXTestCase extends XDataTestCase { zkServer = setupZKServer(); Services.get().getConf().set("oozie.zookeeper.connection.string", zkServer.getConnectString()); Services.get().getConf().set("oozie.instance.id", ZK_ID); + Services.get().getConf().setBoolean(ZKConnectionListener.CONF_SHUTDOWN_ON_TIMEOUT, false); createClient(); createServiceDiscovery(); }
