Repository: oozie
Updated Branches:
  refs/heads/master d1d14f998 -> c618e56eb


OOZIE-1896 ZKUUIDService - Too many job submission fails


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c618e56e
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c618e56e
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c618e56e

Branch: refs/heads/master
Commit: c618e56eb216d978495504cc2528c590c4287eab
Parents: d1d14f9
Author: Purshotam Shah <[email protected]>
Authored: Tue Oct 14 16:00:48 2014 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Tue Oct 14 16:00:48 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/service/UUIDService.java   |  23 ++-
 .../org/apache/oozie/service/ZKUUIDService.java | 169 +++++++++++++------
 .../java/org/apache/oozie/util/ZKUtils.java     |   4 +-
 .../apache/oozie/service/TestZKUUIDService.java |  88 +++++++++-
 release-log.txt                                 |   1 +
 5 files changed, 213 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/c618e56e/core/src/main/java/org/apache/oozie/service/UUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java 
b/core/src/main/java/org/apache/oozie/service/UUIDService.java
index 55ac209..4d209b5 100644
--- a/core/src/main/java/org/apache/oozie/service/UUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.text.SimpleDateFormat;
@@ -42,7 +41,7 @@ public class UUIDService implements Service {
 
     public static final String CONF_GENERATOR = CONF_PREFIX + "generator";
 
-    private String startTime;
+    protected String startTime;
     private AtomicLong counter;
     private String systemId;
 
@@ -94,7 +93,7 @@ public class UUIDService implements Service {
         return UUIDService.class;
     }
 
-    private String longPadding(long number) {
+    protected String longPadding(long number) {
         StringBuilder sb = new StringBuilder();
         sb.append(number);
         if (sb.length() <= 7) {
@@ -122,10 +121,10 @@ public class UUIDService implements Service {
         return sb.toString();
     }
 
-    public String getSequence() {
+    private String getSequence() {
         StringBuilder sb = new StringBuilder();
         if (counter != null) {
-            sb.append(longPadding(getID())).append('-').append(startTime);
+            sb.append(createSequence());
         }
         else {
             sb.append(UUID.randomUUID().toString());
@@ -136,10 +135,20 @@ public class UUIDService implements Service {
         return sb.toString();
     }
 
-    public long getID() {
+    protected String createSequence() {
+        return appendTimeToSequence(getCounter(), startTime);
+    }
+
+    protected long getCounter() {
         return counter.getAndIncrement();
     }
 
+    protected String appendTimeToSequence(long id, String localStartTime) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(longPadding(id)).append('-').append(localStartTime);
+        return sb.toString();
+    }
+
     /**
      * Create a child ID.
      * <p/>
@@ -199,4 +208,4 @@ public class UUIDService implements Service {
             return type;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/c618e56e/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java 
b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
index 69294ab..6cdd28e 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
@@ -16,11 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.oozie.service;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.recipes.atomic.AtomicValue;
 import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.atomic.PromotedToLock;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
@@ -29,9 +35,9 @@ import org.apache.oozie.util.ZKUtils;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Service that provides distributed job id sequence via ZooKeeper.  Requires 
that a ZooKeeper ensemble is available.
- * The sequence path will be located under a ZNode named "job_id_sequence" 
under the namespace (see {@link ZKUtils}).
- * The sequence will be reset to 0, once max is reached.
+ * Service that provides distributed job id sequence via ZooKeeper. Requires 
that a ZooKeeper ensemble is available. The
+ * sequence path will be located under a ZNode named "job_id_sequence" under 
the namespace (see {@link ZKUtils}). The
+ * sequence will be reset to 0, once max is reached.
  */
 
 public class ZKUUIDService extends UUIDService {
@@ -39,26 +45,39 @@ public class ZKUUIDService extends UUIDService {
     public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"ZKUUIDService.";
 
     public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + 
"jobid.sequence.max";
+    public static final String LOCKS_NODE = "/SEQUENCE_LOCK";
 
     public static final String ZK_SEQUENCE_PATH = "job_id_sequence";
 
-    public static final long RESET_VALUE = 0l;
+    public static final long RESET_VALUE = 0L;
     public static final int RETRY_COUNT = 3;
 
     private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
 
     private ZKUtils zk;
-    private static Long maxSequence =  9999990l;
+    private static Long maxSequence = 9999990L;
 
     DistributedAtomicLong atomicIdGenerator;
 
+    public static final ThreadLocal<SimpleDateFormat> dt = new 
ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyMMddHHmmssSSS");
+        }
+    };
+
+
     @Override
     public void init(Services services) throws ServiceException {
 
         super.init(services);
         try {
             zk = ZKUtils.register(this);
-            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), 
ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy());
+            PromotedToLock.Builder lockBuilder = 
PromotedToLock.builder().lockPath(getPromotedLock())
+                    
.retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, 
TimeUnit.MILLISECONDS);
+            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), 
ZK_SEQUENCE_PATH, getRetryPolicy(),
+                    lockBuilder.build());
+
         }
         catch (Exception ex) {
             throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
@@ -72,88 +91,106 @@ public class ZKUUIDService extends UUIDService {
      * @return the id
      * @throws Exception the exception
      */
-    public long getID() {
-        return getZKId(0);
+    @Override
+    protected String createSequence() {
+        String localStartTime = super.startTime;
+        long id = 0L;
+        try {
+            id = getZKSequence();
+        }
+        catch (Exception e) {
+            LOG.error("Error getting jobId, switching to old UUIDService", e);
+            id = super.getCounter();
+            localStartTime = dt.get().format(new Date());
+        }
+        return appendTimeToSequence(id, localStartTime);
+    }
+
+    protected synchronized long getZKSequence() throws Exception {
+        long id = getDistributedSequence();
+
+        if (id >= maxSequence) {
+            resetSequence();
+            id = getDistributedSequence();
+        }
+        return id;
     }
 
     @SuppressWarnings("finally")
-    private long getZKId(int retryCount) {
+    private long getDistributedSequence() throws Exception {
         if (atomicIdGenerator == null) {
-            throw new RuntimeException("Sequence generator can't be null. Path 
: " + ZK_SEQUENCE_PATH);
+            throw new Exception("Sequence generator can't be null. Path : " + 
ZK_SEQUENCE_PATH);
         }
         AtomicValue<Long> value = null;
         try {
             value = atomicIdGenerator.increment();
         }
         catch (Exception e) {
-            throw new RuntimeException("Exception incrementing UID for session 
", e);
+            throw new Exception("Exception incrementing UID for session ", e);
         }
         finally {
             if (value != null && value.succeeded()) {
-                if (value.preValue() >= maxSequence) {
-                    if (retryCount >= RETRY_COUNT) {
-                        throw new RuntimeException("Can't reset sequence. 
Tried " + retryCount + " times");
-                    }
-                    resetSequence();
-                    return getZKId(retryCount + 1);
-                }
                 return value.preValue();
             }
             else {
-                throw new RuntimeException("Exception incrementing UID for 
session ");
+                throw new Exception("Exception incrementing UID for session ");
             }
         }
-
     }
 
     /**
      * Once sequence is reached limit, reset to 0.
+     *
+     * @throws Exception
      */
-    private void resetSequence() {
-        synchronized (ZKUUIDService.class) {
+    private  void resetSequence() throws Exception {
+        for (int i = 0; i < RETRY_COUNT; i++) {
+            AtomicValue<Long> value = atomicIdGenerator.get();
+            if (value.succeeded()) {
+                if (value.postValue() < maxSequence) {
+                    return;
+                }
+            }
+            // Acquire ZK lock, so that other host doesn't reset sequence.
+            LockToken lock = null;
             try {
-                // Double check if sequence is already reset.
-                AtomicValue<Long> value = atomicIdGenerator.get();
-                if (value.succeeded()) {
-                    if (value.postValue() < maxSequence) {
-                        return;
-                    }
+                lock = Services.get().get(MemoryLocksService.class)
+                        .getWriteLock(ZKUUIDService.class.getName(), 
lockTimeout);
+            }
+            catch (InterruptedException e1) {
+                //ignore
+            }
+            try {
+                if (lock == null) {
+                    LOG.info("Lock is held by other system, will sleep and try 
again");
+                    Thread.sleep(1000);
+                    continue;
                 }
                 else {
-                    throw new RuntimeException("Can't reset sequence");
-                }
-                // Acquire ZK lock, so that other host doesn't reset sequence.
-                LockToken lock = Services.get().get(MemoryLocksService.class)
-                        .getWriteLock(ZKUUIDService.class.getName(), 
lockTimeout);
-                try {
-                    if (lock == null) {
-                        LOG.info("Lock is held by other system, returning");
-                        return;
-                    }
-                    else {
-                        value = atomicIdGenerator.get();
-                        if (value.succeeded()) {
-                            if (value.postValue() < maxSequence) {
-                                return;
-                            }
-                        }
-                        else {
-                            throw new RuntimeException("Can't reset sequence");
+                    value = atomicIdGenerator.get();
+                    if (value.succeeded()) {
+                        if (value.postValue() < maxSequence) {
+                            return;
                         }
+                    }
+                    try {
                         atomicIdGenerator.forceSet(RESET_VALUE);
-                        resetStartTime();
                     }
-                }
-                finally {
-                    if (lock != null) {
-                        lock.release();
+                    catch (Exception e) {
+                        LOG.info("Exception while resetting sequence, will try 
again");
+                        continue;
                     }
+                    resetStartTime();
+                    return;
                 }
             }
-            catch (Exception e) {
-                throw new RuntimeException("Can't reset sequence", e);
+            finally {
+                if (lock != null) {
+                    lock.release();
+                }
             }
         }
+        throw new Exception("Can't reset ID sequence in ZK. Retried " + 
RETRY_COUNT + " times");
     }
 
     @Override
@@ -165,8 +202,28 @@ public class ZKUUIDService extends UUIDService {
         super.destroy();
     }
 
+    public String getPromotedLock() {
+        if (ZKUtils.getZKNameSpace().startsWith("/")) {
+            return ZKUtils.getZKNameSpace() + LOCKS_NODE;
+
+        }
+        else {
+            return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE;
+        }
+    }
+
     @VisibleForTesting
     public void setMaxSequence(long sequence) {
         maxSequence = sequence;
     }
-}
+
+    /**
+     * Retries 25 times with delay of 200ms
+     *
+     * @return RetryNTimes
+     */
+    private static RetryPolicy getRetryPolicy() {
+        return new RetryNTimes(25, 200);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/c618e56e/core/src/main/java/org/apache/oozie/util/ZKUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java 
b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index ce38bf6..f535f86 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -172,7 +172,7 @@ public class ZKUtils {
 
     private void createClient() throws Exception {
         // Connect to the ZooKeeper server
-        RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
+        RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
         String zkConnectionString = 
Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
         String zkNamespace = getZKNameSpace();
         zkConnectionTimeout = 
Services.get().getConf().getInt(ZK_CONNECTION_TIMEOUT, 180);
@@ -404,7 +404,7 @@ public class ZKUtils {
      *
      * @return RetryPolicy
      */
-    public static RetryPolicy getRetryPloicy() {
+    public static RetryPolicy getRetryPolicy() {
         return new ExponentialBackoffRetry(1000, 3);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/c618e56e/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
index 2952596..191f4e6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.oozie.BulkResponseInfo;
 import org.apache.oozie.BundleEngine;
@@ -168,7 +168,7 @@ public class TestZKUUIDService extends ZKXTestCase {
         Services service = Services.get();
         service.setService(ZKLocksService.class);
 
-        final List<Integer> result = new ArrayList<Integer>(5000);
+        final AtomicInteger result[] = new AtomicInteger[5000];
         final ZKUUIDService uuid1 = new ZKUUIDService();
         final ZKUUIDService uuid2 = new ZKUUIDService();
         setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
@@ -178,7 +178,7 @@ public class TestZKUUIDService extends ZKXTestCase {
         uuid2.setMaxSequence(5000);
 
         for (int i = 0; i < 5000; i++) {
-            result.add(i, i);
+            result[i]=new AtomicInteger(0);
         }
 
         try {
@@ -187,7 +187,7 @@ public class TestZKUUIDService extends ZKXTestCase {
                     for (int i = 0; i < 5000; i++) {
                         String id = uuid1.generateId(ApplicationType.WORKFLOW);
                         int index = Integer.parseInt(id.substring(0, 7));
-                        result.add(index, result.get(index) + 1);
+                        result[index].incrementAndGet();
                     }
                 }
             };
@@ -196,7 +196,7 @@ public class TestZKUUIDService extends ZKXTestCase {
                     for (int i = 0; i < 5000; i++) {
                         String id = uuid2.generateId(ApplicationType.WORKFLOW);
                         int index = Integer.parseInt(id.substring(0, 7));
-                        result.add(index, result.get(index) + 1);
+                        result[index].incrementAndGet();
                     }
                 }
             };
@@ -205,7 +205,7 @@ public class TestZKUUIDService extends ZKXTestCase {
             t1.join();
             t2.join();
             for (int i = 0; i < 5000; i++) {
-                assertEquals(result.get(i), Integer.valueOf(2));
+                assertEquals(result[i].get(), 2);
             }
         }
         finally {
@@ -221,7 +221,7 @@ public class TestZKUUIDService extends ZKXTestCase {
             setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
             uuid.init(service);
             String bundleId = uuid.generateId(ApplicationType.BUNDLE);
-            BundleJobBean bundle=createBundleJob(bundleId, 
Job.Status.SUCCEEDED, false);
+            BundleJobBean bundle = createBundleJob(bundleId, 
Job.Status.SUCCEEDED, false);
             JPAService jpaService = Services.get().get(JPAService.class);
             BundleJobInsertJPAExecutor bundleInsertjpa = new 
BundleJobInsertJPAExecutor(bundle);
             jpaService.execute(bundleInsertjpa);
@@ -241,4 +241,78 @@ public class TestZKUUIDService extends ZKXTestCase {
         }
     }
 
+    public void testFallback() throws Exception {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmssSSS");
+
+        Services service = Services.get();
+        ZKUUIDServiceWithException uuid = new ZKUUIDServiceWithException();
+        try {
+            setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+            uuid.init(service);
+            String id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000000-"));
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000001-"));
+
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000002-"));
+
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000003-"));
+
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000004-"));
+            uuid.setThrowException();
+
+            Date beforeDate=new Date();
+            Thread.sleep(2000);
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000000-"));
+            assertTrue(dateFormat.parse(id.split("-")[1]).after(beforeDate));
+
+            beforeDate=new Date();
+            Thread.sleep(2000);
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000001-"));
+            assertTrue(dateFormat.parse(id.split("-")[1]).after(beforeDate));
+
+            uuid.resetThrowException();
+            beforeDate=new Date();
+            Thread.sleep(2000);
+            id = uuid.generateId(ApplicationType.BUNDLE);
+            assertTrue(id.startsWith("0000005-"));
+            assertTrue(dateFormat.parse(id.split("-")[1]).before(beforeDate));
+
+
+        }
+        finally {
+            uuid.destroy();
+        }
+    }
+
 }
+
+class ZKUUIDServiceWithException extends ZKUUIDService {
+    boolean throwEx = false;
+
+    public ZKUUIDServiceWithException() {
+
+    }
+
+    public void setThrowException() {
+        throwEx = true;
+    }
+
+    public void resetThrowException() {
+        throwEx = false;
+    }
+
+    @Override
+    protected long getZKSequence() throws Exception {
+        if (throwEx) {
+            throw new Exception("Can't generate UUID");
+        }
+        return super.getZKSequence();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/c618e56e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b6865bc..79bded0 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1896 ZKUUIDService - Too many job submission fails (puru)
 OOZIE-2019 SLA miss processed on server2 not send email (puru)
 OOZIE-1391 Sub wf suspend doesn't update parent wf (jaydeepvishwakarma via 
shwethags)
 OOZIE-2023 Job rerun can stuck in prep (puru)

Reply via email to