Repository: oozie Updated Branches: refs/heads/branch-4.1 a8381b7dc -> 0d2f60152
OOZIE-1896 ZKUUIDService - Too many job submission fails Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0d2f6015 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0d2f6015 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0d2f6015 Branch: refs/heads/branch-4.1 Commit: 0d2f60152709783882d777f0a6ba73ba5de528b4 Parents: a8381b7 Author: Purshotam Shah <[email protected]> Authored: Wed Oct 15 14:39:10 2014 -0700 Committer: Purshotam Shah <[email protected]> Committed: Wed Oct 15 14:39:10 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/service/UUIDService.java | 22 ++- .../org/apache/oozie/service/ZKUUIDService.java | 168 +++++++++++++------ .../java/org/apache/oozie/util/ZKUtils.java | 14 +- .../apache/oozie/service/TestZKUUIDService.java | 87 +++++++++- release-log.txt | 1 + 5 files changed, 222 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/main/java/org/apache/oozie/service/UUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java b/core/src/main/java/org/apache/oozie/service/UUIDService.java index b5e593b..4d209b5 100644 --- a/core/src/main/java/org/apache/oozie/service/UUIDService.java +++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java @@ -41,7 +41,7 @@ public class UUIDService implements Service { public static final String CONF_GENERATOR = CONF_PREFIX + "generator"; - private String startTime; + protected String startTime; private AtomicLong counter; private String systemId; @@ -93,7 +93,7 @@ public class UUIDService implements Service { return UUIDService.class; } - private String longPadding(long number) { + protected String longPadding(long number) { StringBuilder sb = new StringBuilder(); sb.append(number); if (sb.length() <= 7) { @@ -121,10 +121,10 @@ public class UUIDService implements Service { return sb.toString(); } - public String getSequence() { + private String getSequence() { StringBuilder sb = new StringBuilder(); if (counter != null) { - sb.append(longPadding(getID())).append('-').append(startTime); + sb.append(createSequence()); } else { sb.append(UUID.randomUUID().toString()); @@ -135,10 +135,20 @@ public class UUIDService implements Service { return sb.toString(); } - public long getID() { + protected String createSequence() { + return appendTimeToSequence(getCounter(), startTime); + } + + protected long getCounter() { return counter.getAndIncrement(); } + protected String appendTimeToSequence(long id, String localStartTime) { + StringBuilder sb = new StringBuilder(); + sb.append(longPadding(id)).append('-').append(localStartTime); + return sb.toString(); + } + /** * Create a child ID. * <p/> @@ -198,4 +208,4 @@ public class UUIDService implements Service { return type; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java index 0b2be64..6cdd28e 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java @@ -18,8 +18,15 @@ package org.apache.oozie.service; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.RetryPolicy; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; +import org.apache.curator.framework.recipes.atomic.PromotedToLock; +import org.apache.curator.retry.RetryNTimes; import org.apache.oozie.ErrorCode; import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XLog; @@ -28,9 +35,9 @@ import org.apache.oozie.util.ZKUtils; import com.google.common.annotations.VisibleForTesting; /** - * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. - * The sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). - * The sequence will be reset to 0, once max is reached. + * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. The + * sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). The + * sequence will be reset to 0, once max is reached. */ public class ZKUUIDService extends UUIDService { @@ -38,26 +45,39 @@ public class ZKUUIDService extends UUIDService { public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService."; public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max"; + public static final String LOCKS_NODE = "/SEQUENCE_LOCK"; public static final String ZK_SEQUENCE_PATH = "job_id_sequence"; - public static final long RESET_VALUE = 0l; + public static final long RESET_VALUE = 0L; public static final int RETRY_COUNT = 3; private final static XLog LOG = XLog.getLog(ZKUUIDService.class); private ZKUtils zk; - private static Long maxSequence = 9999990l; + private static Long maxSequence = 9999990L; DistributedAtomicLong atomicIdGenerator; + public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat("yyMMddHHmmssSSS"); + } + }; + + @Override public void init(Services services) throws ServiceException { super.init(services); try { zk = ZKUtils.register(this); - atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy()); + PromotedToLock.Builder lockBuilder = PromotedToLock.builder().lockPath(getPromotedLock()) + .retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, TimeUnit.MILLISECONDS); + atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, getRetryPolicy(), + lockBuilder.build()); + } catch (Exception ex) { throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); @@ -71,88 +91,106 @@ public class ZKUUIDService extends UUIDService { * @return the id * @throws Exception the exception */ - public long getID() { - return getZKId(0); + @Override + protected String createSequence() { + String localStartTime = super.startTime; + long id = 0L; + try { + id = getZKSequence(); + } + catch (Exception e) { + LOG.error("Error getting jobId, switching to old UUIDService", e); + id = super.getCounter(); + localStartTime = dt.get().format(new Date()); + } + return appendTimeToSequence(id, localStartTime); + } + + protected synchronized long getZKSequence() throws Exception { + long id = getDistributedSequence(); + + if (id >= maxSequence) { + resetSequence(); + id = getDistributedSequence(); + } + return id; } @SuppressWarnings("finally") - private long getZKId(int retryCount) { + private long getDistributedSequence() throws Exception { if (atomicIdGenerator == null) { - throw new RuntimeException("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH); + throw new Exception("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH); } AtomicValue<Long> value = null; try { value = atomicIdGenerator.increment(); } catch (Exception e) { - throw new RuntimeException("Exception incrementing UID for session ", e); + throw new Exception("Exception incrementing UID for session ", e); } finally { if (value != null && value.succeeded()) { - if (value.preValue() >= maxSequence) { - if (retryCount >= RETRY_COUNT) { - throw new RuntimeException("Can't reset sequence. Tried " + retryCount + " times"); - } - resetSequence(); - return getZKId(retryCount + 1); - } return value.preValue(); } else { - throw new RuntimeException("Exception incrementing UID for session "); + throw new Exception("Exception incrementing UID for session "); } } - } /** * Once sequence is reached limit, reset to 0. + * + * @throws Exception */ - private void resetSequence() { - synchronized (ZKUUIDService.class) { + private void resetSequence() throws Exception { + for (int i = 0; i < RETRY_COUNT; i++) { + AtomicValue<Long> value = atomicIdGenerator.get(); + if (value.succeeded()) { + if (value.postValue() < maxSequence) { + return; + } + } + // Acquire ZK lock, so that other host doesn't reset sequence. + LockToken lock = null; try { - // Double check if sequence is already reset. - AtomicValue<Long> value = atomicIdGenerator.get(); - if (value.succeeded()) { - if (value.postValue() < maxSequence) { - return; - } + lock = Services.get().get(MemoryLocksService.class) + .getWriteLock(ZKUUIDService.class.getName(), lockTimeout); + } + catch (InterruptedException e1) { + //ignore + } + try { + if (lock == null) { + LOG.info("Lock is held by other system, will sleep and try again"); + Thread.sleep(1000); + continue; } else { - throw new RuntimeException("Can't reset sequence"); - } - // Acquire ZK lock, so that other host doesn't reset sequence. - LockToken lock = Services.get().get(MemoryLocksService.class) - .getWriteLock(ZKUUIDService.class.getName(), lockTimeout); - try { - if (lock == null) { - LOG.info("Lock is held by other system, returning"); - return; - } - else { - value = atomicIdGenerator.get(); - if (value.succeeded()) { - if (value.postValue() < maxSequence) { - return; - } - } - else { - throw new RuntimeException("Can't reset sequence"); + value = atomicIdGenerator.get(); + if (value.succeeded()) { + if (value.postValue() < maxSequence) { + return; } + } + try { atomicIdGenerator.forceSet(RESET_VALUE); - resetStartTime(); } - } - finally { - if (lock != null) { - lock.release(); + catch (Exception e) { + LOG.info("Exception while resetting sequence, will try again"); + continue; } + resetStartTime(); + return; } } - catch (Exception e) { - throw new RuntimeException("Can't reset sequence", e); + finally { + if (lock != null) { + lock.release(); + } } } + throw new Exception("Can't reset ID sequence in ZK. Retried " + RETRY_COUNT + " times"); } @Override @@ -164,8 +202,28 @@ public class ZKUUIDService extends UUIDService { super.destroy(); } + public String getPromotedLock() { + if (ZKUtils.getZKNameSpace().startsWith("/")) { + return ZKUtils.getZKNameSpace() + LOCKS_NODE; + + } + else { + return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE; + } + } + @VisibleForTesting public void setMaxSequence(long sequence) { maxSequence = sequence; } -} + + /** + * Retries 25 times with delay of 200ms + * + * @return RetryNTimes + */ + private static RetryPolicy getRetryPolicy() { + return new RetryNTimes(25, 200); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/main/java/org/apache/oozie/util/ZKUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java index 25ed2ee..a3368c4 100644 --- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java +++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java @@ -156,9 +156,9 @@ public class ZKUtils { private void createClient() throws Exception { // Connect to the ZooKeeper server - RetryPolicy retryPolicy = ZKUtils.getRetryPloicy(); + RetryPolicy retryPolicy = ZKUtils.getRetryPolicy(); String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181"); - String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie"); + String zkNamespace = getZKNameSpace(); ACLProvider aclProvider; if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); @@ -378,13 +378,21 @@ public class ZKUtils { return saslACL; } } + + /** + * Returns configured zk namesapces + * @return oozie.zookeeper.namespace + */ + public static String getZKNameSpace() { + return Services.get().getConf().get(ZK_NAMESPACE, "oozie"); + } /** * Returns retry policy * * @return RetryPolicy */ - public static RetryPolicy getRetryPloicy() { + public static RetryPolicy getRetryPolicy() { return new ExponentialBackoffRetry(1000, 3); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java index 67697bd..191f4e6 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java @@ -21,6 +21,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.oozie.BulkResponseInfo; import org.apache.oozie.BundleEngine; @@ -167,7 +168,7 @@ public class TestZKUUIDService extends ZKXTestCase { Services service = Services.get(); service.setService(ZKLocksService.class); - final List<Integer> result = new ArrayList<Integer>(5000); + final AtomicInteger result[] = new AtomicInteger[5000]; final ZKUUIDService uuid1 = new ZKUUIDService(); final ZKUUIDService uuid2 = new ZKUUIDService(); setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); @@ -177,7 +178,7 @@ public class TestZKUUIDService extends ZKXTestCase { uuid2.setMaxSequence(5000); for (int i = 0; i < 5000; i++) { - result.add(i, i); + result[i]=new AtomicInteger(0); } try { @@ -186,7 +187,7 @@ public class TestZKUUIDService extends ZKXTestCase { for (int i = 0; i < 5000; i++) { String id = uuid1.generateId(ApplicationType.WORKFLOW); int index = Integer.parseInt(id.substring(0, 7)); - result.add(index, result.get(index) + 1); + result[index].incrementAndGet(); } } }; @@ -195,7 +196,7 @@ public class TestZKUUIDService extends ZKXTestCase { for (int i = 0; i < 5000; i++) { String id = uuid2.generateId(ApplicationType.WORKFLOW); int index = Integer.parseInt(id.substring(0, 7)); - result.add(index, result.get(index) + 1); + result[index].incrementAndGet(); } } }; @@ -204,7 +205,7 @@ public class TestZKUUIDService extends ZKXTestCase { t1.join(); t2.join(); for (int i = 0; i < 5000; i++) { - assertEquals(result.get(i), Integer.valueOf(2)); + assertEquals(result[i].get(), 2); } } finally { @@ -220,7 +221,7 @@ public class TestZKUUIDService extends ZKXTestCase { setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); uuid.init(service); String bundleId = uuid.generateId(ApplicationType.BUNDLE); - BundleJobBean bundle=createBundleJob(bundleId, Job.Status.SUCCEEDED, false); + BundleJobBean bundle = createBundleJob(bundleId, Job.Status.SUCCEEDED, false); JPAService jpaService = Services.get().get(JPAService.class); BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle); jpaService.execute(bundleInsertjpa); @@ -240,4 +241,78 @@ public class TestZKUUIDService extends ZKXTestCase { } } + public void testFallback() throws Exception { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmssSSS"); + + Services service = Services.get(); + ZKUUIDServiceWithException uuid = new ZKUUIDServiceWithException(); + try { + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + uuid.init(service); + String id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000000-")); + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000001-")); + + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000002-")); + + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000003-")); + + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000004-")); + uuid.setThrowException(); + + Date beforeDate=new Date(); + Thread.sleep(2000); + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000000-")); + assertTrue(dateFormat.parse(id.split("-")[1]).after(beforeDate)); + + beforeDate=new Date(); + Thread.sleep(2000); + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000001-")); + assertTrue(dateFormat.parse(id.split("-")[1]).after(beforeDate)); + + uuid.resetThrowException(); + beforeDate=new Date(); + Thread.sleep(2000); + id = uuid.generateId(ApplicationType.BUNDLE); + assertTrue(id.startsWith("0000005-")); + assertTrue(dateFormat.parse(id.split("-")[1]).before(beforeDate)); + + + } + finally { + uuid.destroy(); + } + } + } + +class ZKUUIDServiceWithException extends ZKUUIDService { + boolean throwEx = false; + + public ZKUUIDServiceWithException() { + + } + + public void setThrowException() { + throwEx = true; + } + + public void resetThrowException() { + throwEx = false; + } + + @Override + protected long getZKSequence() throws Exception { + if (throwEx) { + throw new Exception("Can't generate UUID"); + } + return super.getZKSequence(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/0d2f6015/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b651b2c..6ab740c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (4.1 - unreleased) +OOZIE-1896 ZKUUIDService - Too many job submission fails (puru) OOZIE-2005 Coordinator rerun fails to initialize error code and message (ryota) OOZIE-2019 SLA miss processed on server2 not send email (puru) OOZIE-2026 fix synchronization in SLACalculatorMemory.addJobStatus to avoid duplicated SLA message (ryota)
