Repository: oozie Updated Branches: refs/heads/master 659c45c4d -> ecedf6dc6
OOZIE-1715 Distributed ID sequence for HA (puru via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ecedf6dc Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ecedf6dc Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ecedf6dc Branch: refs/heads/master Commit: ecedf6dc6f10c60298fd33653afc79b1f8fbc21d Parents: 659c45c Author: Robert Kanter <[email protected]> Authored: Tue Jun 10 13:57:14 2014 -0700 Committer: Robert Kanter <[email protected]> Committed: Tue Jun 10 13:57:14 2014 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/service/UUIDService.java | 38 +++- .../org/apache/oozie/service/ZKUUIDService.java | 174 ++++++++++++++++ .../java/org/apache/oozie/util/ZKUtils.java | 11 +- core/src/main/resources/oozie-default.xml | 8 + .../apache/oozie/service/TestZKUUIDService.java | 203 +++++++++++++++++++ docs/src/site/twiki/AG_Install.twiki | 20 +- release-log.txt | 1 + 7 files changed, 443 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/service/UUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java b/core/src/main/java/org/apache/oozie/service/UUIDService.java index 836815d..7489a53 100644 --- a/core/src/main/java/org/apache/oozie/service/UUIDService.java +++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -56,7 +56,7 @@ public class UUIDService implements Service { String genType = services.getConf().get(CONF_GENERATOR, "counter").trim(); if (genType.equals("counter")) { counter = new AtomicLong(); - startTime = new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date()); + startTime = getStartTime(); } else { if (!genType.equals("random")) { @@ -76,6 +76,14 @@ public class UUIDService implements Service { } /** + * Get Server start time + * @return + */ + public String getStartTime() { + return new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date()); + } + + /** * Return the public interface for UUID service. * * @return {@link UUIDService}. @@ -103,8 +111,20 @@ public class UUIDService implements Service { public String generateId(ApplicationType type) { StringBuilder sb = new StringBuilder(); + sb.append(getSequence()); + sb.append('-').append(systemId); + sb.append('-').append(type.getType()); + // limitation due to current DB schema for action ID length (100) + if (sb.length() > 40) { + throw new RuntimeException(XLog.format("ID exceeds limit of 40 characters, [{0}]", sb)); + } + return sb.toString(); + } + + public String getSequence() { + StringBuilder sb = new StringBuilder(); if (counter != null) { - sb.append(longPadding(counter.getAndIncrement())).append('-').append(startTime); + sb.append(longPadding(getID())).append('-').append(startTime); } else { sb.append(UUID.randomUUID().toString()); @@ -112,15 +132,13 @@ public class UUIDService implements Service { sb.setLength(37 - systemId.length()); } } - sb.append('-').append(systemId); - sb.append('-').append(type.getType()); - // limitation due to current DB schema for action ID length (100) - if (sb.length() > 40) { - throw new RuntimeException(XLog.format("ID exceeds limit of 40 characters, [{0}]", sb)); - } return sb.toString(); } + public long getID() { + return counter.getAndIncrement(); + } + /** * Create a child ID. * <p/> http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java new file mode 100644 index 0000000..33d782b --- /dev/null +++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.lock.LockToken; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.ZKUtils; + +/** + * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. + * The sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). + * The sequence will be reset to 0, once max is reached. + */ + +public class ZKUUIDService extends UUIDService { + + public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService."; + + public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max"; + + public static final String ZK_SEQUENCE_PATH = "job_id_sequence"; + public static final long DEFULT_SEQUENCE_MAX = 99999999990l; + public static final long RESET_VALUE = 0l; + public static final int RETRY_COUNT = 3; + + private final static XLog LOG = XLog.getLog(ZKUUIDService.class); + + private ZKUtils zk; + Long maxSequence; + + DistributedAtomicLong atomicIdGenerator; + + @Override + public void init(Services services) throws ServiceException { + + super.init(services); + try { + zk = ZKUtils.register(this); + maxSequence = services.getConf().getLong(CONF_SEQUENCE_MAX, DEFULT_SEQUENCE_MAX); + atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy()); + } + catch (Exception ex) { + throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); + } + + } + + /** + * Gets the unique id. + * + * @return the id + * @throws Exception the exception + */ + public long getID() { + return getZKId(0); + } + + @SuppressWarnings("finally") + private long getZKId(int retryCount) { + if (atomicIdGenerator == null) { + throw new RuntimeException("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH); + } + AtomicValue<Long> value = null; + try { + value = atomicIdGenerator.increment(); + } + catch (Exception e) { + throw new RuntimeException("Exception incrementing UID for session ", e); + } + finally { + if (value != null && value.succeeded()) { + if (value.preValue() >= maxSequence) { + if (retryCount >= RETRY_COUNT) { + throw new RuntimeException("Can't reset sequence. Tried " + retryCount + " times"); + } + resetSequence(); + return getZKId(retryCount + 1); + } + return value.preValue(); + } + else { + throw new RuntimeException("Exception incrementing UID for session "); + } + } + + } + + /** + * Once sequence is reached limit, reset to 0. + */ + private void resetSequence() { + synchronized (ZKUUIDService.class) { + try { + // Double check if sequence is already reset. + AtomicValue<Long> value = atomicIdGenerator.get(); + if (value.succeeded()) { + if (value.postValue() < maxSequence) { + return; + } + } + else { + throw new RuntimeException("Can't reset sequence"); + } + // Acquire ZK lock, so that other host doesn't reset sequence. + LockToken lock = Services.get().get(MemoryLocksService.class) + .getWriteLock(ZKUUIDService.class.getName(), lockTimeout); + try { + if (lock == null) { + LOG.info("Lock is held by other system, returning"); + return; + } + else { + value = atomicIdGenerator.get(); + if (value.succeeded()) { + if (value.postValue() < maxSequence) { + return; + } + } + else { + throw new RuntimeException("Can't reset sequence"); + } + atomicIdGenerator.forceSet(RESET_VALUE); + } + } + finally { + if (lock != null) { + lock.release(); + } + } + } + catch (Exception e) { + throw new RuntimeException("Can't reset sequence", e); + } + } + } + + /** + * Get start time. + */ + public String getStartTime(){ + return new SimpleDateFormat("yyMMddHHmmss").format(new Date()); + } + + @Override + public void destroy() { + if (zk != null) { + zk.unregister(this); + } + zk = null; + super.destroy(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/util/ZKUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java index 56055b8..885b656 100644 --- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java +++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java @@ -153,7 +153,7 @@ public class ZKUtils { private void createClient() throws Exception { // Connect to the ZooKeeper server - RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + RetryPolicy retryPolicy = ZKUtils.getRetryPloicy(); String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181"); String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie"); ACLProvider aclProvider; @@ -375,4 +375,13 @@ public class ZKUtils { return saslACL; } } + + /** + * Returns retry policy + * + * @return RetryPolicy + */ + public static RetryPolicy getRetryPloicy() { + return new ExponentialBackoffRetry(1000, 3); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 63a91fa..c9d9591 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2113,5 +2113,13 @@ </description> </property> + <property> + <name>oozie.service.ZKUUIDService.jobid.sequence.max</name> + <value>99999999990</value> + <description> + Maximum job id sequence for Oozie in HA mode. Current job id sequence is stored in ZK. Once the sequence reaches + maximum limit, server will reset job id sequence to 0. + </description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java new file mode 100644 index 0000000..c41383c --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.service; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.oozie.service.UUIDService.ApplicationType; +import org.apache.oozie.test.ZKXTestCase; +import org.apache.oozie.util.ZKUtils; + +public class TestZKUUIDService extends ZKXTestCase { + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testRegisterUnregister() throws Exception { + assertEquals(0, ZKUtils.getUsers().size()); + ZKUUIDService zkUUUIDService = new ZKUUIDService(); + try { + zkUUUIDService.init(Services.get()); + assertEquals(1, ZKUtils.getUsers().size()); + assertEquals(zkUUUIDService, ZKUtils.getUsers().iterator().next()); + zkUUUIDService.destroy(); + assertEquals(0, ZKUtils.getUsers().size()); + } + finally { + zkUUUIDService.destroy(); + } + } + + public void testIDGeneration() throws Exception { + ZKUUIDService uuid = new ZKUUIDService(); + try { + + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + uuid.init(Services.get()); + String id = uuid.generateId(ApplicationType.WORKFLOW); + assertTrue(id.startsWith("0000000-")); + for (int i = 0; i < 1000; i++) { + id = uuid.generateId(ApplicationType.WORKFLOW); + } + assertTrue(id.startsWith("0001000-")); + } + finally { + uuid.destroy(); + } + } + + public void testMultipleIDGeneration() throws Exception { + ZKUUIDService uuid1 = new ZKUUIDService(); + ZKUUIDService uuid2 = new ZKUUIDService(); + + try { + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + uuid1.init(Services.get()); + uuid2.init(Services.get()); + for (int i = 0; i < 1000; i += 2) { + String id1 = uuid1.generateId(ApplicationType.WORKFLOW); + String id2 = uuid2.generateId(ApplicationType.WORKFLOW); + assertEquals(Integer.parseInt(id1.substring(0, 7)), i); + assertEquals(Integer.parseInt(id2.substring(0, 7)), i + 1); + } + } + finally { + uuid1.destroy(); + uuid2.destroy(); + } + + } + + public void testMultipleIDGeneration_withMultiThread() throws Exception { + final List<Boolean> result = new ArrayList<Boolean>(10000); + final ZKUUIDService uuid1 = new ZKUUIDService(); + final ZKUUIDService uuid2 = new ZKUUIDService(); + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + uuid1.init(Services.get()); + uuid2.init(Services.get()); + + try { + Thread t1 = new Thread() { + public void run() { + for (int i = 0; i < 5000; i++) { + String id = uuid1.generateId(ApplicationType.WORKFLOW); + result.add(Integer.parseInt(id.substring(0, 7)), true); + } + } + }; + Thread t2 = new Thread() { + public void run() { + for (int i = 0; i < 5000; i++) { + String id = uuid2.generateId(ApplicationType.WORKFLOW); + result.add(Integer.parseInt(id.substring(0, 7)), true); + } + } + }; + t1.start(); + t2.start(); + t1.join(); + t2.join(); + for (int i = 0; i < 10000; i++) { + assertTrue(result.get(i)); + } + } + finally { + uuid1.destroy(); + uuid2.destroy(); + } + } + + public void testResetSequence() throws Exception { + Services service = Services.get(); + service.setService(ZKLocksService.class); + ZKUUIDService uuid = new ZKUUIDService(); + try { + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "900"); + uuid.init(service); + String id = uuid.generateId(ApplicationType.WORKFLOW); + assertTrue(id.startsWith("0000000-")); + for (int i = 0; i < 1000; i++) { + id = uuid.generateId(ApplicationType.WORKFLOW); + } + assertTrue(id.startsWith("0000100-")); + } + finally { + uuid.destroy(); + } + } + + public void testResetSequence_withMultiThread() throws Exception { + Services service = Services.get(); + service.setService(ZKLocksService.class); + + final List<Integer> result = new ArrayList<Integer>(5000); + final ZKUUIDService uuid1 = new ZKUUIDService(); + final ZKUUIDService uuid2 = new ZKUUIDService(); + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "5000"); + + uuid1.init(service); + uuid2.init(service); + + for (int i = 0; i < 5000; i++) { + result.add(i, i); + } + + try { + Thread t1 = new Thread() { + public void run() { + for (int i = 0; i < 5000; i++) { + String id = uuid1.generateId(ApplicationType.WORKFLOW); + int index = Integer.parseInt(id.substring(0, 7)); + result.add(index, result.get(index) + 1); + } + } + }; + Thread t2 = new Thread() { + public void run() { + for (int i = 0; i < 5000; i++) { + String id = uuid2.generateId(ApplicationType.WORKFLOW); + int index = Integer.parseInt(id.substring(0, 7)); + result.add(index, result.get(index) + 1); + } + } + }; + t1.start(); + t2.start(); + t1.join(); + t2.join(); + for (int i = 0; i < 5000; i++) { + assertEquals(result.get(i), Integer.valueOf(2)); + } + } + finally { + uuid1.destroy(); + uuid2.destroy(); + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/docs/src/site/twiki/AG_Install.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki index 89f7407..ce84e54 100644 --- a/docs/src/site/twiki/AG_Install.twiki +++ b/docs/src/site/twiki/AG_Install.twiki @@ -751,7 +751,8 @@ make Oozie use the ZooKeeper versions of these services instead of the default i <value> org.apache.oozie.service.ZKLocksService, org.apache.oozie.service.ZKXLogStreamingService, - org.apache.oozie.service.ZKJobsConcurrencyService + org.apache.oozie.service.ZKJobsConcurrencyService, + org.apache.oozie.service.ZKUUIDService </value> </property> </verbatim> @@ -831,6 +832,23 @@ kerberos.removeHostFromPrincipal=true kerberos.removeRealmFromPrincipal=true </verbatim> +---++++ JobId sequence +Oozie in HA mode, uses ZK to generate job id sequence. Job Ids are of following format. +<Id sequence>-<yyMMddHHmmss(server start time)>-<system_id>-<W/C/B> + +Where, <systemId> is configured as =oozie.system.id= (default is "oozie-" + "user.name") +W/C/B is suffix to job id indicating that generated job is a type of workflow or coordinator or bundle. + +Maximum allowed character for job id sequence is 40. "Id sequence" is stored in ZK and reset to 0 once maximum job id sequence is +reached. Maximum job id sequence is configured as =oozie.service.ZKUUIDService.jobid.sequence.max=, default value is 99999999990. + +<verbatim> +<property> + <name>oozie.service.ZKUUIDService.jobid.sequence.max</name> + <value>99999999990</value> +</property> +</verbatim> + ---++ Starting and Stopping Oozie Use the standard Tomcat commands to start and stop Oozie. http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 8071b46..74b5dfd 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1715 Distributed ID sequence for HA (puru via rkanter) OOZIE-1870 Workflow action doen't resolve retry-max and retry-interval (puru via rohini) OOZIE-1686 Typo in DG_CommandLineTool (anbu78 via ryota) OOZIE-1804 Improve documentation for Coordinator Specification (lars_francke via rkanter)
