Repository: oozie Updated Branches: refs/heads/branch-4.1 0b763d79e -> 283a3c6a8
OOZIE-1812 Bulk API with bundle Id should relax regex check for Id (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/283a3c6a Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/283a3c6a Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/283a3c6a Branch: refs/heads/branch-4.1 Commit: 283a3c6a84b5ce87b9475c538ee396987c0184f9 Parents: 0b763d7 Author: Rohini Palaniswamy <[email protected]> Authored: Wed Jul 23 23:18:54 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Wed Jul 23 23:19:25 2014 -0700 ---------------------------------------------------------------------- .../oozie/executor/jpa/BulkJPAExecutor.java | 3 +- .../org/apache/oozie/service/UUIDService.java | 8 ++-- .../org/apache/oozie/service/ZKUUIDService.java | 23 +++++----- core/src/main/resources/oozie-default.xml | 9 ---- .../apache/oozie/service/TestZKUUIDService.java | 46 ++++++++++++++++++-- .../org/apache/oozie/test/XDataTestCase.java | 21 ++++++++- release-log.txt | 1 + 7 files changed, 79 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java index 8327aee..9074987 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java @@ -42,6 +42,7 @@ import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.service.Services; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ParamChecker; @@ -140,7 +141,7 @@ public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { * @return PARAM_TYPE */ private PARAM_TYPE getParamType(String id, char job) { - Pattern p = Pattern.compile("\\d{7}-\\d{15}-oozie-[a-z]{4}-" + job); + Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job); Matcher m = p.matcher(id); if (m.matches()) { return PARAM_TYPE.ID; http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/core/src/main/java/org/apache/oozie/service/UUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java b/core/src/main/java/org/apache/oozie/service/UUIDService.java index 7489a53..b5e593b 100644 --- a/core/src/main/java/org/apache/oozie/service/UUIDService.java +++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java @@ -56,7 +56,7 @@ public class UUIDService implements Service { String genType = services.getConf().get(CONF_GENERATOR, "counter").trim(); if (genType.equals("counter")) { counter = new AtomicLong(); - startTime = getStartTime(); + resetStartTime(); } else { if (!genType.equals("random")) { @@ -76,11 +76,11 @@ public class UUIDService implements Service { } /** - * Get Server start time + * reset start time * @return */ - public String getStartTime() { - return new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date()); + protected void resetStartTime() { + startTime = new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date()); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java index 33d782b..0b2be64 100644 --- a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java +++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java @@ -18,9 +18,6 @@ package org.apache.oozie.service; -import java.text.SimpleDateFormat; -import java.util.Date; - import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.oozie.ErrorCode; @@ -28,6 +25,8 @@ import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XLog; import org.apache.oozie.util.ZKUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. * The sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). @@ -41,14 +40,14 @@ public class ZKUUIDService extends UUIDService { public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max"; public static final String ZK_SEQUENCE_PATH = "job_id_sequence"; - public static final long DEFULT_SEQUENCE_MAX = 99999999990l; + public static final long RESET_VALUE = 0l; public static final int RETRY_COUNT = 3; private final static XLog LOG = XLog.getLog(ZKUUIDService.class); private ZKUtils zk; - Long maxSequence; + private static Long maxSequence = 9999990l; DistributedAtomicLong atomicIdGenerator; @@ -58,7 +57,6 @@ public class ZKUUIDService extends UUIDService { super.init(services); try { zk = ZKUtils.register(this); - maxSequence = services.getConf().getLong(CONF_SEQUENCE_MAX, DEFULT_SEQUENCE_MAX); atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy()); } catch (Exception ex) { @@ -142,6 +140,7 @@ public class ZKUUIDService extends UUIDService { throw new RuntimeException("Can't reset sequence"); } atomicIdGenerator.forceSet(RESET_VALUE); + resetStartTime(); } } finally { @@ -156,13 +155,6 @@ public class ZKUUIDService extends UUIDService { } } - /** - * Get start time. - */ - public String getStartTime(){ - return new SimpleDateFormat("yyMMddHHmmss").format(new Date()); - } - @Override public void destroy() { if (zk != null) { @@ -171,4 +163,9 @@ public class ZKUUIDService extends UUIDService { zk = null; super.destroy(); } + + @VisibleForTesting + public void setMaxSequence(long sequence) { + maxSequence = sequence; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 029ad1e..4a58e9b 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2166,13 +2166,4 @@ </description> </property> - <property> - <name>oozie.service.ZKUUIDService.jobid.sequence.max</name> - <value>99999999990</value> - <description> - Maximum job id sequence for Oozie in HA mode. Current job id sequence is stored in ZK. Once the sequence reaches - maximum limit, server will reset job id sequence to 0. - </description> - </property> - </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java index c41383c..67697bd 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java @@ -17,9 +17,18 @@ */ package org.apache.oozie.service; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.List; +import org.apache.oozie.BulkResponseInfo; +import org.apache.oozie.BundleEngine; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.client.Job; +import org.apache.oozie.executor.jpa.BulkJPAExecutor; +import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.UUIDService.ApplicationType; import org.apache.oozie.test.ZKXTestCase; import org.apache.oozie.util.ZKUtils; @@ -131,19 +140,23 @@ public class TestZKUUIDService extends ZKXTestCase { } public void testResetSequence() throws Exception { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmssSSS"); Services service = Services.get(); service.setService(ZKLocksService.class); ZKUUIDService uuid = new ZKUUIDService(); try { setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); - Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "900"); + uuid.setMaxSequence(900); uuid.init(service); String id = uuid.generateId(ApplicationType.WORKFLOW); + Date d = dateFormat.parse(id.split("-")[1]); assertTrue(id.startsWith("0000000-")); for (int i = 0; i < 1000; i++) { id = uuid.generateId(ApplicationType.WORKFLOW); } assertTrue(id.startsWith("0000100-")); + Date newDate = dateFormat.parse(id.split("-")[1]); + assertTrue(newDate.after(d)); } finally { uuid.destroy(); @@ -158,10 +171,10 @@ public class TestZKUUIDService extends ZKXTestCase { final ZKUUIDService uuid1 = new ZKUUIDService(); final ZKUUIDService uuid2 = new ZKUUIDService(); setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); - Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "5000"); - uuid1.init(service); uuid2.init(service); + uuid1.setMaxSequence(5000); + uuid2.setMaxSequence(5000); for (int i = 0; i < 5000; i++) { result.add(i, i); @@ -200,4 +213,31 @@ public class TestZKUUIDService extends ZKXTestCase { } } + public void testBulkJobForZKUUIDService() throws Exception { + Services service = Services.get(); + ZKUUIDService uuid = new ZKUUIDService(); + try { + setSystemProperty(UUIDService.CONF_GENERATOR, "counter"); + uuid.init(service); + String bundleId = uuid.generateId(ApplicationType.BUNDLE); + BundleJobBean bundle=createBundleJob(bundleId, Job.Status.SUCCEEDED, false); + JPAService jpaService = Services.get().get(JPAService.class); + BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle); + jpaService.execute(bundleInsertjpa); + addCoordForBulkMonitor(bundleId); + String request = "bundle=" + bundleId; + BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 1); + try { + BulkResponseInfo response = jpaService.execute(bulkjpa); + assertEquals(response.getResponses().get(0).getBundle().getId(), bundleId); + } + catch (JPAExecutorException jex) { + fail(); // should not throw exception as this case is now supported + } + } + finally { + uuid.destroy(); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index a19b877..7614f03 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -1290,7 +1290,7 @@ public abstract class XDataTestCase extends XHCatTestCase { * @return bundle job bean * @throws Exception */ - protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean pending) throws Exception { + protected BundleJobBean createBundleJob(String jobID, Job.Status jobStatus, boolean pending) throws Exception { Path coordPath1 = new Path(getFsTestCaseDir(), "coord1"); Path coordPath2 = new Path(getFsTestCaseDir(), "coord2"); writeCoordXml(coordPath1, "coord-job-bundle.xml"); @@ -1316,7 +1316,7 @@ public abstract class XDataTestCase extends XHCatTestCase { conf.set("appName", "bundle-app-name"); BundleJobBean bundle = new BundleJobBean(); - bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE)); + bundle.setId(jobID); bundle.setAppName("BUNDLE-TEST"); bundle.setAppPath(bundleAppPath.toString()); bundle.setConf(XmlUtils.prettyPrint(conf).toString()); @@ -1342,6 +1342,18 @@ public abstract class XDataTestCase extends XHCatTestCase { } /** + * Create bundle job bean + * @param jobStatus + * @param pending + * @return + * @throws Exception + */ + protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean pending) throws Exception { + return createBundleJob(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE), jobStatus, pending); + } + + + /** * Create bundle job that contains bad coordinator jobs * * @param jobStatus @@ -1454,7 +1466,12 @@ public abstract class XDataTestCase extends XHCatTestCase { BundleJobBean bundle = addRecordToBundleJobTable(BundleJob.Status.RUNNING, false); bundleId = bundle.getId(); bundleName = bundle.getAppName(); + addCoordForBulkMonitor(bundleId); + } + protected void addCoordForBulkMonitor(String bundleId) throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); // adding coordinator job(s) for this bundle addRecordToCoordJobTableWithBundle(bundleId, "Coord1", CoordinatorJob.Status.RUNNING, true, true, 2); addRecordToCoordJobTableWithBundle(bundleId, "Coord2", CoordinatorJob.Status.RUNNING, true, true, 1); http://git-wip-us.apache.org/repos/asf/oozie/blob/283a3c6a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index e7fc18c..6a3e48d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1812 Bulk API with bundle Id should relax regex check for Id (puru via rohini) OOZIE-1915 Move system properties to conf properties (puru via rohini) OOZIE-1934 coordinator action repeatedly picked up by cachePurgeWorker of PartitionDependencyManagerService (ryota) OOZIE-1933 SLACalculatorMemory HA changes assume SLARegistrationBean exists for all jobs (mona)
