Repository: oozie
Updated Branches:
  refs/heads/master b87686b74 -> 9f150b0b1


Amendment to OOZIE-1847


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9f150b0b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9f150b0b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9f150b0b

Branch: refs/heads/master
Commit: 9f150b0b150c813c7a973046cf2b864ee5e1d27c
Parents: b87686b
Author: egashira <[email protected]>
Authored: Tue Oct 7 14:07:36 2014 -0700
Committer: egashira <[email protected]>
Committed: Tue Oct 7 14:07:36 2014 -0700

----------------------------------------------------------------------
 .../event/listener/ZKConnectionListener.java    | 25 +++++++++++++-------
 .../oozie/service/ZKJobsConcurrencyService.java |  5 ++--
 .../apache/oozie/service/ZKLocksService.java    | 10 +++++---
 .../java/org/apache/oozie/util/ZKUtils.java     | 16 ++++++++++---
 core/src/main/resources/oozie-default.xml       | 11 ++++++++-
 .../java/org/apache/oozie/test/ZKXTestCase.java |  3 ++-
 6 files changed, 52 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java 
b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
index 1406b6a..a5d22c0 100644
--- 
a/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
+++ 
b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
@@ -31,6 +31,8 @@ import org.apache.oozie.util.ZKUtils;
 public class ZKConnectionListener implements ConnectionStateListener {
 
     private XLog LOG = XLog.getLog(getClass());
+    private static ConnectionState connectionState;
+    public static final String CONF_SHUTDOWN_ON_TIMEOUT = 
"oozie.zookeeper.server.shutdown.ontimeout";
 
     public ZKConnectionListener() {
         LOG.info("ZKConnectionListener started");
@@ -38,13 +40,14 @@ public class ZKConnectionListener implements 
ConnectionStateListener {
 
     @Override
     public void stateChanged(final CuratorFramework client, final 
ConnectionState newState) {
+        connectionState = newState;
         LOG.trace("ZK connection status  = " + newState.toString());
-//        if (newState == ConnectionState.CONNECTED) {
-//             ZK connected
-//        }
+        // if (newState == ConnectionState.CONNECTED) {
+        // ZK connected
+        // }
         if (newState == ConnectionState.SUSPENDED) {
             LOG.warn("ZK connection is suspended, waiting for reconnect. If 
connection doesn't reconnect before "
-                    + ZKUtils.getZKConnectionTimeout() + " Oozie server will 
shutdown itself");
+                    + ZKUtils.getZKConnectionTimeout() + " (sec) Oozie server 
will shutdown itself");
         }
 
         if (newState == ConnectionState.RECONNECTED) {
@@ -53,10 +56,16 @@ public class ZKConnectionListener implements 
ConnectionStateListener {
         }
 
         if (newState == ConnectionState.LOST) {
-            LOG.fatal("ZK is connection is not reconnected in " + 
ZKUtils.getZKConnectionTimeout()
-                    + ", shutting down Oozie server");
-            Services.get().destroy();
-            System.exit(1);
+            LOG.fatal("ZK is not reconnected in " + 
ZKUtils.getZKConnectionTimeout());
+            if (Services.get().getConf().getBoolean(CONF_SHUTDOWN_ON_TIMEOUT, 
true)) {
+                LOG.fatal("Shutting down Oozie server");
+                Services.get().destroy();
+                System.exit(1);
+            }
         }
     }
+
+    public static ConnectionState getZKConnectionState() {
+        return connectionState;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java 
b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 96fb373..a64f613 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.util.ArrayList;
@@ -26,9 +25,11 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
@@ -82,7 +83,7 @@ public class ZKJobsConcurrencyService extends 
JobsConcurrencyService implements
      */
     @Override
     public void destroy() {
-        if (leaderLatch != null) {
+        if (leaderLatch != null && ZKConnectionListener.getZKConnectionState() 
!= ConnectionState.LOST) {
             IOUtils.closeSafely(leaderLatch);
         }
         if (zk != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java 
b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 36c00b9..6f333c8 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -15,24 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
+
 import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.curator.framework.recipes.locks.ChildReaper;
 import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.ThreadUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -78,7 +83,7 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
      */
     @Override
     public void destroy() {
-        if (reaper != null) {
+        if (reaper != null && ZKConnectionListener.getZKConnectionState() != 
ConnectionState.LOST) {
             try {
                 reaper.close();
             }
@@ -86,7 +91,6 @@ public class ZKLocksService extends MemoryLocksService 
implements Service, Instr
                 LOG.error("Error closing childReaper", e);
             }
         }
-
         if (zk != null) {
             zk.unregister(this);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java 
b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index ec32092..ce38bf6 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.util;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +34,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -163,7 +163,9 @@ public class ZKUtils {
         // If there are no more classes using ZooKeeper, we should teardown 
everything.
         users.remove(user);
         if (users.isEmpty() && zk != null) {
-            zk.teardown();
+            if (ZKConnectionListener.getZKConnectionState() != 
ConnectionState.LOST) {
+                zk.teardown();
+            }
             zk = null;
         }
     }
@@ -172,7 +174,7 @@ public class ZKUtils {
         // Connect to the ZooKeeper server
         RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
         String zkConnectionString = 
Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
-        String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, 
"oozie");
+        String zkNamespace = getZKNameSpace();
         zkConnectionTimeout = 
Services.get().getConf().getInt(ZK_CONNECTION_TIMEOUT, 180);
 
         ACLProvider aclProvider;
@@ -405,6 +407,14 @@ public class ZKUtils {
     public static RetryPolicy getRetryPloicy() {
         return new ExponentialBackoffRetry(1000, 3);
     }
+
+    /**
+     * Returns configured zk namesapces
+     * @return oozie.zookeeper.namespace
+     */
+    public static String getZKNameSpace() {
+        return Services.get().getConf().get(ZK_NAMESPACE, "oozie");
+    }
     /**
      * Return ZK connection timeout
      * @return

http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 06693bb..26eb7e0 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2069,7 +2069,16 @@
         <value>180</value>
         <description>
         Default ZK connection timeout (in sec). If connection is lost for more 
than timeout, then Oozie server will shutdown
-        itself.
+        itself if oozie.zookeeper.server.shutdown.ontimeout is true.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.zookeeper.server.shutdown.ontimeout</name>
+        <value>true</value>
+        <description>
+            If true, Oozie server will shutdown itself on ZK
+            connection timeout.
         </description>
     </property>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/9f150b0b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java 
b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 5697199..84d6e5d 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.test;
 
 import java.io.IOException;
@@ -35,6 +34,7 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.FixedJsonInstanceSerializer;
 import org.apache.oozie.util.ZKUtils;
@@ -89,6 +89,7 @@ public abstract class ZKXTestCase extends XDataTestCase {
         zkServer = setupZKServer();
         Services.get().getConf().set("oozie.zookeeper.connection.string", 
zkServer.getConnectString());
         Services.get().getConf().set("oozie.instance.id", ZK_ID);
+        
Services.get().getConf().setBoolean(ZKConnectionListener.CONF_SHUTDOWN_ON_TIMEOUT,
 false);
         createClient();
         createServiceDiscovery();
     }

Reply via email to