Repository: oozie
Updated Branches:
  refs/heads/branch-4.1 a8381b7dc -> 0d2f60152


OOZIE-1896 ZKUUIDService - Too many job submission fails


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0d2f6015
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0d2f6015
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0d2f6015

Branch: refs/heads/branch-4.1
Commit: 0d2f60152709783882d777f0a6ba73ba5de528b4
Parents: a8381b7
Author: Purshotam Shah <[email protected]>
Authored: Wed Oct 15 14:39:10 2014 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Wed Oct 15 14:39:10 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/service/UUIDService.java   |  22 ++-
 .../org/apache/oozie/service/ZKUUIDService.java | 168 +++++++++++++------
 .../java/org/apache/oozie/util/ZKUtils.java     |  14 +-
 .../apache/oozie/service/TestZKUUIDService.java |  87 +++++++++-
 release-log.txt                                 |   1 +
 5 files changed, 222 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/main/java/org/apache/oozie/service/UUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java 
b/core/src/main/java/org/apache/oozie/service/UUIDService.java
index b5e593b..4d209b5 100644
--- a/core/src/main/java/org/apache/oozie/service/UUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java
@@ -41,7 +41,7 @@ public class UUIDService implements Service {
 
     public static final String CONF_GENERATOR = CONF_PREFIX + "generator";
 
-    private String startTime;
+    protected String startTime;
     private AtomicLong counter;
     private String systemId;
 
@@ -93,7 +93,7 @@ public class UUIDService implements Service {
         return UUIDService.class;
     }
 
-    private String longPadding(long number) {
+    protected String longPadding(long number) {
         StringBuilder sb = new StringBuilder();
         sb.append(number);
         if (sb.length() <= 7) {
@@ -121,10 +121,10 @@ public class UUIDService implements Service {
         return sb.toString();
     }
 
-    public String getSequence() {
+    private String getSequence() {
         StringBuilder sb = new StringBuilder();
         if (counter != null) {
-            sb.append(longPadding(getID())).append('-').append(startTime);
+            sb.append(createSequence());
         }
         else {
             sb.append(UUID.randomUUID().toString());
@@ -135,10 +135,20 @@ public class UUIDService implements Service {
         return sb.toString();
     }
 
-    public long getID() {
+    protected String createSequence() {
+        return appendTimeToSequence(getCounter(), startTime);
+    }
+
+    protected long getCounter() {
         return counter.getAndIncrement();
     }
 
+    protected String appendTimeToSequence(long id, String localStartTime) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(longPadding(id)).append('-').append(localStartTime);
+        return sb.toString();
+    }
+
     /**
      * Create a child ID.
      * <p/>
@@ -198,4 +208,4 @@ public class UUIDService implements Service {
             return type;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java 
b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
index 0b2be64..6cdd28e 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
@@ -18,8 +18,15 @@
 
 package org.apache.oozie.service;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.recipes.atomic.AtomicValue;
 import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.atomic.PromotedToLock;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
@@ -28,9 +35,9 @@ import org.apache.oozie.util.ZKUtils;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Service that provides distributed job id sequence via ZooKeeper.  Requires 
that a ZooKeeper ensemble is available.
- * The sequence path will be located under a ZNode named "job_id_sequence" 
under the namespace (see {@link ZKUtils}).
- * The sequence will be reset to 0, once max is reached.
+ * Service that provides distributed job id sequence via ZooKeeper. Requires 
that a ZooKeeper ensemble is available. The
+ * sequence path will be located under a ZNode named "job_id_sequence" under 
the namespace (see {@link ZKUtils}). The
+ * sequence will be reset to 0, once max is reached.
  */
 
 public class ZKUUIDService extends UUIDService {
@@ -38,26 +45,39 @@ public class ZKUUIDService extends UUIDService {
     public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"ZKUUIDService.";
 
     public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + 
"jobid.sequence.max";
+    public static final String LOCKS_NODE = "/SEQUENCE_LOCK";
 
     public static final String ZK_SEQUENCE_PATH = "job_id_sequence";
 
-    public static final long RESET_VALUE = 0l;
+    public static final long RESET_VALUE = 0L;
     public static final int RETRY_COUNT = 3;
 
     private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
 
     private ZKUtils zk;
-    private static Long maxSequence =  9999990l;
+    private static Long maxSequence = 9999990L;
 
     DistributedAtomicLong atomicIdGenerator;
 
+    public static final ThreadLocal<SimpleDateFormat> dt = new 
ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyMMddHHmmssSSS");
+        }
+    };
+
+
     @Override
     public void init(Services services) throws ServiceException {
 
         super.init(services);
         try {
             zk = ZKUtils.register(this);
-            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), 
ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy());
+            PromotedToLock.Builder lockBuilder = 
PromotedToLock.builder().lockPath(getPromotedLock())
+                    
.retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, 
TimeUnit.MILLISECONDS);
+            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), 
ZK_SEQUENCE_PATH, getRetryPolicy(),
+                    lockBuilder.build());
+
         }
         catch (Exception ex) {
             throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
@@ -71,88 +91,106 @@ public class ZKUUIDService extends UUIDService {
      * @return the id
      * @throws Exception the exception
      */
-    public long getID() {
-        return getZKId(0);
+    @Override
+    protected String createSequence() {
+        String localStartTime = super.startTime;
+        long id = 0L;
+        try {
+            id = getZKSequence();
+        }
+        catch (Exception e) {
+            LOG.error("Error getting jobId, switching to old UUIDService", e);
+            id = super.getCounter();
+            localStartTime = dt.get().format(new Date());
+        }
+        return appendTimeToSequence(id, localStartTime);
+    }
+
+    protected synchronized long getZKSequence() throws Exception {
+        long id = getDistributedSequence();
+
+        if (id >= maxSequence) {
+            resetSequence();
+            id = getDistributedSequence();
+        }
+        return id;
     }
 
     @SuppressWarnings("finally")
-    private long getZKId(int retryCount) {
+    private long getDistributedSequence() throws Exception {
         if (atomicIdGenerator == null) {
-            throw new RuntimeException("Sequence generator can't be null. Path 
: " + ZK_SEQUENCE_PATH);
+            throw new Exception("Sequence generator can't be null. Path : " + 
ZK_SEQUENCE_PATH);
         }
         AtomicValue<Long> value = null;
         try {
             value = atomicIdGenerator.increment();
         }
         catch (Exception e) {
-            throw new RuntimeException("Exception incrementing UID for session 
", e);
+            throw new Exception("Exception incrementing UID for session ", e);
         }
         finally {
             if (value != null && value.succeeded()) {
-                if (value.preValue() >= maxSequence) {
-                    if (retryCount >= RETRY_COUNT) {
-                        throw new RuntimeException("Can't reset sequence. 
Tried " + retryCount + " times");
-                    }
-                    resetSequence();
-                    return getZKId(retryCount + 1);
-                }
                 return value.preValue();
             }
             else {
-                throw new RuntimeException("Exception incrementing UID for 
session ");
+                throw new Exception("Exception incrementing UID for session ");
             }
         }
-
     }
 
     /**
      * Once sequence is reached limit, reset to 0.
+     *
+     * @throws Exception
      */
-    private void resetSequence() {
-        synchronized (ZKUUIDService.class) {
+    private  void resetSequence() throws Exception {
+        for (int i = 0; i < RETRY_COUNT; i++) {
+            AtomicValue<Long> value = atomicIdGenerator.get();
+            if (value.succeeded()) {
+                if (value.postValue() < maxSequence) {
+                    return;
+                }
+            }
+            // Acquire ZK lock, so that other host doesn't reset sequence.
+            LockToken lock = null;
             try {
-                // Double check if sequence is already reset.
-                AtomicValue<Long> value = atomicIdGenerator.get();
-                if (value.succeeded()) {
-                    if (value.postValue() < maxSequence) {
-                        return;
-                    }
+                lock = Services.get().get(MemoryLocksService.class)
+                        .getWriteLock(ZKUUIDService.class.getName(), 
lockTimeout);
+            }
+            catch (InterruptedException e1) {
+                //ignore
+            }
+            try {
+                if (lock == null) {
+                    LOG.info("Lock is held by other system, will sleep and try 
again");
+                    Thread.sleep(1000);
+                    continue;
                 }
                 else {
-                    throw new RuntimeException("Can't reset sequence");
-                }
-                // Acquire ZK lock, so that other host doesn't reset sequence.
-                LockToken lock = Services.get().get(MemoryLocksService.class)
-                        .getWriteLock(ZKUUIDService.class.getName(), 
lockTimeout);
-                try {
-                    if (lock == null) {
-                        LOG.info("Lock is held by other system, returning");
-                        return;
-                    }
-                    else {
-                        value = atomicIdGenerator.get();
-                        if (value.succeeded()) {
-                            if (value.postValue() < maxSequence) {
-                                return;
-                            }
-                        }
-                        else {
-                            throw new RuntimeException("Can't reset sequence");
+                    value = atomicIdGenerator.get();
+                    if (value.succeeded()) {
+                        if (value.postValue() < maxSequence) {
+                            return;
                         }
+                    }
+                    try {
                         atomicIdGenerator.forceSet(RESET_VALUE);
-                        resetStartTime();
                     }
-                }
-                finally {
-                    if (lock != null) {
-                        lock.release();
+                    catch (Exception e) {
+                        LOG.info("Exception while resetting sequence, will try 
again");
+                        continue;
                     }
+                    resetStartTime();
+                    return;
                 }
             }
-            catch (Exception e) {
-                throw new RuntimeException("Can't reset sequence", e);
+            finally {
+                if (lock != null) {
+                    lock.release();
+                }
             }
         }
+        throw new Exception("Can't reset ID sequence in ZK. Retried " + 
RETRY_COUNT + " times");
     }
 
     @Override
@@ -164,8 +202,28 @@ public class ZKUUIDService extends UUIDService {
         super.destroy();
     }
 
+    public String getPromotedLock() {
+        if (ZKUtils.getZKNameSpace().startsWith("/")) {
+            return ZKUtils.getZKNameSpace() + LOCKS_NODE;
+
+        }
+        else {
+            return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE;
+        }
+    }
+
     @VisibleForTesting
     public void setMaxSequence(long sequence) {
         maxSequence = sequence;
     }
-}
+
+    /**
+     * Retries 25 times with delay of 200ms
+     *
+     * @return RetryNTimes
+     */
+    private static RetryPolicy getRetryPolicy() {
+        return new RetryNTimes(25, 200);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/main/java/org/apache/oozie/util/ZKUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java 
b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index 25ed2ee..a3368c4 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -156,9 +156,9 @@ public class ZKUtils {
 
     private void createClient() throws Exception {
         // Connect to the ZooKeeper server
-        RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
+        RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
         String zkConnectionString = 
Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
-        String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, 
"oozie");
+        String zkNamespace = getZKNameSpace();
         ACLProvider aclProvider;
         if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
             log.info("Connecting to ZooKeeper with SASL/Kerberos and using 
'sasl' ACLs");
@@ -378,13 +378,21 @@ public class ZKUtils {
             return saslACL;
         }
     }
+    
+    /**
+     * Returns configured zk namesapces
+     * @return oozie.zookeeper.namespace
+     */
+    public static String getZKNameSpace() {
+        return Services.get().getConf().get(ZK_NAMESPACE, "oozie");
+    }
 
     /**
      * Returns retry policy
      *
      * @return RetryPolicy
      */
-    public static RetryPolicy getRetryPloicy() {
+    public static RetryPolicy getRetryPolicy() {
         return new ExponentialBackoffRetry(1000, 3);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
index 67697bd..191f4e6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
@@ -21,6 +21,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.oozie.BulkResponseInfo;
 import org.apache.oozie.BundleEngine;
@@ -167,7 +168,7 @@ public class TestZKUUIDService extends ZKXTestCase {
         Services service = Services.get();
         service.setService(ZKLocksService.class);
 
-        final List<Integer> result = new ArrayList<Integer>(5000);
+        final AtomicInteger result[] = new AtomicInteger[5000];
         final ZKUUIDService uuid1 = new ZKUUIDService();
         final ZKUUIDService uuid2 = new ZKUUIDService();
         setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
@@ -177,7 +178,7 @@ public class TestZKUUIDService extends ZKXTestCase {
         uuid2.setMaxSequence(5000);
 
         for (int i = 0; i < 5000; i++) {
-            result.add(i, i);
+            result[i]=new AtomicInteger(0);
         }
 
         try {
@@ -186,7 +187,7 @@ public class TestZKUUIDService extends ZKXTestCase {
                     for (int i = 0; i < 5000; i++) {
                         String id = uuid1.generateId(ApplicationType.WORKFLOW);
                         int index = Integer.parseInt(id.substring(0, 7));
-                        result.add(index, result.get(index) + 1);
+                        result[index].incrementAndGet();
                     }
                 }
             };
@@ -195,7 +196,7 @@ public class TestZKUUIDService extends ZKXTestCase {
                     for (int i = 0; i < 5000; i++) {
                         String id = uuid2.generateId(ApplicationType.WORKFLOW);
                         int index = Integer.parseInt(id.substring(0, 7));
-                        result.add(index, result.get(index) + 1);
+                        result[index].incrementAndGet();
                     }
                 }
             };
@@ -204,7 +205,7 @@ public class TestZKUUIDService extends ZKXTestCase {
             t1.join();
             t2.join();
             for (int i = 0; i < 5000; i++) {
-                assertEquals(result.get(i), Integer.valueOf(2));
+                assertEquals(result[i].get(), 2);
             }
         }
         finally {
@@ -220,7 +221,7 @@ public class TestZKUUIDService extends ZKXTestCase {
             setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
             uuid.init(service);
             String bundleId = uuid.generateId(ApplicationType.BUNDLE);
-            BundleJobBean bundle=createBundleJob(bundleId, 
Job.Status.SUCCEEDED, false);
+            BundleJobBean bundle = createBundleJob(bundleId, 
Job.Status.SUCCEEDED, false);
             JPAService jpaService = Services.get().get(JPAService.class);
             BundleJobInsertJPAExecutor bundleInsertjpa = new 
BundleJobInsertJPAExecutor(bundle);
             jpaService.execute(bundleInsertjpa);
@@ -240,4 +241,78 @@ public class TestZKUUIDService extends ZKXTestCase {
         }
     }
 
+    public void testFallback() throws Exception {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmssSSS");
+
+        Services service = Services.get();
+        ZKUUIDServiceWithException uuid = new ZKUUIDServiceWithException();
+        try {
+            setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+            uuid.init(service);
+            String id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000000-"));
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000001-"));
+
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000002-"));
+
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000003-"));
+
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000004-"));
+            uuid.setThrowException();
+
+            Date beforeDate=new Date();
+            Thread.sleep(2000);
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000000-"));
+            assertTrue(dateFormat.parse(id.split("-")[1]).after(beforeDate));
+
+            beforeDate=new Date();
+            Thread.sleep(2000);
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000001-"));
+            assertTrue(dateFormat.parse(id.split("-")[1]).after(beforeDate));
+
+            uuid.resetThrowException();
+            beforeDate=new Date();
+            Thread.sleep(2000);
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000005-"));
+            assertTrue(dateFormat.parse(id.split("-")[1]).before(beforeDate));
+
+
+        }
+        finally {
+            uuid.destroy();
+        }
+    }
+
 }
+
+class ZKUUIDServiceWithException extends ZKUUIDService {
+    boolean throwEx = false;
+
+    public ZKUUIDServiceWithException() {
+
+    }
+
+    public void setThrowException() {
+        throwEx = true;
+    }
+
+    public void resetThrowException() {
+        throwEx = false;
+    }
+
+    @Override
+    protected long getZKSequence() throws Exception {
+        if (throwEx) {
+            throw new Exception("Can't generate UUID");
+        }
+        return super.getZKSequence();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b651b2c..6ab740c 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (4.1 - unreleased)
 
+OOZIE-1896 ZKUUIDService - Too many job submission fails (puru)
 OOZIE-2005 Coordinator rerun fails to initialize error code and message (ryota)
 OOZIE-2019 SLA miss processed on server2 not send email (puru)
 OOZIE-2026 fix synchronization in SLACalculatorMemory.addJobStatus to avoid 
duplicated SLA message (ryota)

Reply via email to