Repository: oozie
Updated Branches:
  refs/heads/master 0695c2033 -> 5be0a8705


OOZIE-1812 Bulk API with bundle Id should relax regex check for Id (puru via 
rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5be0a870
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5be0a870
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5be0a870

Branch: refs/heads/master
Commit: 5be0a8705103e5bbabb84e72007643976fa3e9c7
Parents: 0695c20
Author: Rohini Palaniswamy <[email protected]>
Authored: Wed Jul 23 23:18:54 2014 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Wed Jul 23 23:18:54 2014 -0700

----------------------------------------------------------------------
 .../oozie/executor/jpa/BulkJPAExecutor.java     |  3 +-
 .../org/apache/oozie/service/UUIDService.java   |  8 ++--
 .../org/apache/oozie/service/ZKUUIDService.java | 23 +++++-----
 core/src/main/resources/oozie-default.xml       |  9 ----
 .../apache/oozie/service/TestZKUUIDService.java | 46 ++++++++++++++++++--
 .../org/apache/oozie/test/XDataTestCase.java    | 21 ++++++++-
 release-log.txt                                 |  1 +
 7 files changed, 79 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java 
b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
index 8327aee..9074987 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
@@ -42,6 +42,7 @@ import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 
@@ -140,7 +141,7 @@ public class BulkJPAExecutor implements 
JPAExecutor<BulkResponseInfo> {
      * @return PARAM_TYPE
      */
     private PARAM_TYPE getParamType(String id, char job) {
-        Pattern p = Pattern.compile("\\d{7}-\\d{15}-oozie-[a-z]{4}-" + job);
+        Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + 
Services.get().getSystemId() + "-" + job);
         Matcher m = p.matcher(id);
         if (m.matches()) {
             return PARAM_TYPE.ID;

http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/core/src/main/java/org/apache/oozie/service/UUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java 
b/core/src/main/java/org/apache/oozie/service/UUIDService.java
index 7489a53..b5e593b 100644
--- a/core/src/main/java/org/apache/oozie/service/UUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java
@@ -56,7 +56,7 @@ public class UUIDService implements Service {
         String genType = services.getConf().get(CONF_GENERATOR, 
"counter").trim();
         if (genType.equals("counter")) {
             counter = new AtomicLong();
-            startTime = getStartTime();
+            resetStartTime();
         }
         else {
             if (!genType.equals("random")) {
@@ -76,11 +76,11 @@ public class UUIDService implements Service {
     }
 
     /**
-     * Get Server start time
+     * reset start time
      * @return
      */
-    public String getStartTime() {
-        return new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date());
+    protected void resetStartTime() {
+        startTime = new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java 
b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
index 33d782b..0b2be64 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
@@ -18,9 +18,6 @@
 
 package org.apache.oozie.service;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
 import org.apache.curator.framework.recipes.atomic.AtomicValue;
 import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
 import org.apache.oozie.ErrorCode;
@@ -28,6 +25,8 @@ import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Service that provides distributed job id sequence via ZooKeeper.  Requires 
that a ZooKeeper ensemble is available.
  * The sequence path will be located under a ZNode named "job_id_sequence" 
under the namespace (see {@link ZKUtils}).
@@ -41,14 +40,14 @@ public class ZKUUIDService extends UUIDService {
     public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + 
"jobid.sequence.max";
 
     public static final String ZK_SEQUENCE_PATH = "job_id_sequence";
-    public static final long DEFULT_SEQUENCE_MAX = 99999999990l;
+
     public static final long RESET_VALUE = 0l;
     public static final int RETRY_COUNT = 3;
 
     private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
 
     private ZKUtils zk;
-    Long maxSequence;
+    private static Long maxSequence =  9999990l;
 
     DistributedAtomicLong atomicIdGenerator;
 
@@ -58,7 +57,6 @@ public class ZKUUIDService extends UUIDService {
         super.init(services);
         try {
             zk = ZKUtils.register(this);
-            maxSequence = services.getConf().getLong(CONF_SEQUENCE_MAX, 
DEFULT_SEQUENCE_MAX);
             atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), 
ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy());
         }
         catch (Exception ex) {
@@ -142,6 +140,7 @@ public class ZKUUIDService extends UUIDService {
                             throw new RuntimeException("Can't reset sequence");
                         }
                         atomicIdGenerator.forceSet(RESET_VALUE);
+                        resetStartTime();
                     }
                 }
                 finally {
@@ -156,13 +155,6 @@ public class ZKUUIDService extends UUIDService {
         }
     }
 
-    /**
-     * Get start time.
-     */
-    public String getStartTime(){
-        return new SimpleDateFormat("yyMMddHHmmss").format(new Date());
-    }
-
     @Override
     public void destroy() {
         if (zk != null) {
@@ -171,4 +163,9 @@ public class ZKUUIDService extends UUIDService {
         zk = null;
         super.destroy();
     }
+
+    @VisibleForTesting
+    public void setMaxSequence(long sequence) {
+        maxSequence = sequence;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 029ad1e..4a58e9b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2166,13 +2166,4 @@
         </description>
     </property>
 
-    <property>
-        <name>oozie.service.ZKUUIDService.jobid.sequence.max</name>
-        <value>99999999990</value>
-        <description>
-        Maximum job id sequence for Oozie in HA mode. Current job id sequence 
is stored in ZK. Once the sequence reaches
-        maximum limit, server will reset job id sequence to 0.
-        </description>
-    </property>
-
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
index c41383c..67697bd 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
@@ -17,9 +17,18 @@
  */
 package org.apache.oozie.service;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.BundleEngine;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.executor.jpa.BulkJPAExecutor;
+import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.ZKUtils;
@@ -131,19 +140,23 @@ public class TestZKUUIDService extends ZKXTestCase {
     }
 
     public void testResetSequence() throws Exception {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyMMddHHmmssSSS");
         Services service = Services.get();
         service.setService(ZKLocksService.class);
         ZKUUIDService uuid = new ZKUUIDService();
         try {
             setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
-            Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, 
"900");
+            uuid.setMaxSequence(900);
             uuid.init(service);
             String id = uuid.generateId(ApplicationType.WORKFLOW);
+            Date d = dateFormat.parse(id.split("-")[1]);
             assertTrue(id.startsWith("0000000-"));
             for (int i = 0; i < 1000; i++) {
                 id = uuid.generateId(ApplicationType.WORKFLOW);
             }
             assertTrue(id.startsWith("0000100-"));
+            Date newDate = dateFormat.parse(id.split("-")[1]);
+            assertTrue(newDate.after(d));
         }
         finally {
             uuid.destroy();
@@ -158,10 +171,10 @@ public class TestZKUUIDService extends ZKXTestCase {
         final ZKUUIDService uuid1 = new ZKUUIDService();
         final ZKUUIDService uuid2 = new ZKUUIDService();
         setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
-        Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "5000");
-
         uuid1.init(service);
         uuid2.init(service);
+        uuid1.setMaxSequence(5000);
+        uuid2.setMaxSequence(5000);
 
         for (int i = 0; i < 5000; i++) {
             result.add(i, i);
@@ -200,4 +213,31 @@ public class TestZKUUIDService extends ZKXTestCase {
         }
     }
 
+    public void testBulkJobForZKUUIDService() throws Exception {
+        Services service = Services.get();
+        ZKUUIDService uuid = new ZKUUIDService();
+        try {
+            setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+            uuid.init(service);
+            String bundleId = uuid.generateId(ApplicationType.BUNDLE);
+            BundleJobBean bundle=createBundleJob(bundleId, 
Job.Status.SUCCEEDED, false);
+            JPAService jpaService = Services.get().get(JPAService.class);
+            BundleJobInsertJPAExecutor bundleInsertjpa = new 
BundleJobInsertJPAExecutor(bundle);
+            jpaService.execute(bundleInsertjpa);
+            addCoordForBulkMonitor(bundleId);
+            String request = "bundle=" + bundleId;
+            BulkJPAExecutor bulkjpa = new 
BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 1);
+            try {
+                BulkResponseInfo response = jpaService.execute(bulkjpa);
+                
assertEquals(response.getResponses().get(0).getBundle().getId(), bundleId);
+            }
+            catch (JPAExecutorException jex) {
+                fail(); // should not throw exception as this case is now 
supported
+            }
+        }
+        finally {
+            uuid.destroy();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index a19b877..7614f03 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -1290,7 +1290,7 @@ public abstract class XDataTestCase extends XHCatTestCase 
{
      * @return bundle job bean
      * @throws Exception
      */
-    protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean 
pending) throws Exception {
+    protected BundleJobBean createBundleJob(String jobID, Job.Status 
jobStatus, boolean pending) throws Exception {
         Path coordPath1 = new Path(getFsTestCaseDir(), "coord1");
         Path coordPath2 = new Path(getFsTestCaseDir(), "coord2");
         writeCoordXml(coordPath1, "coord-job-bundle.xml");
@@ -1316,7 +1316,7 @@ public abstract class XDataTestCase extends XHCatTestCase 
{
         conf.set("appName", "bundle-app-name");
 
         BundleJobBean bundle = new BundleJobBean();
-        
bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE));
+        bundle.setId(jobID);
         bundle.setAppName("BUNDLE-TEST");
         bundle.setAppPath(bundleAppPath.toString());
         bundle.setConf(XmlUtils.prettyPrint(conf).toString());
@@ -1342,6 +1342,18 @@ public abstract class XDataTestCase extends 
XHCatTestCase {
     }
 
     /**
+     * Create bundle job bean
+     * @param jobStatus
+     * @param pending
+     * @return
+     * @throws Exception
+     */
+    protected BundleJobBean createBundleJob(Job.Status jobStatus, boolean 
pending) throws Exception {
+        return 
createBundleJob(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE),
 jobStatus, pending);
+    }
+
+
+    /**
      * Create bundle job that contains bad coordinator jobs
      *
      * @param jobStatus
@@ -1454,7 +1466,12 @@ public abstract class XDataTestCase extends 
XHCatTestCase {
         BundleJobBean bundle = 
addRecordToBundleJobTable(BundleJob.Status.RUNNING, false);
         bundleId = bundle.getId();
         bundleName = bundle.getAppName();
+        addCoordForBulkMonitor(bundleId);
+    }
 
+    protected void addCoordForBulkMonitor(String bundleId) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
         // adding coordinator job(s) for this bundle
         addRecordToCoordJobTableWithBundle(bundleId, "Coord1", 
CoordinatorJob.Status.RUNNING, true, true, 2);
         addRecordToCoordJobTableWithBundle(bundleId, "Coord2", 
CoordinatorJob.Status.RUNNING, true, true, 1);

http://git-wip-us.apache.org/repos/asf/oozie/blob/5be0a870/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 70a7dfa..8650116 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -4,6 +4,7 @@ OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang)
 
 -- Oozie 4.1.0 release (4.1 - unreleased)
 
+OOZIE-1812 Bulk API with bundle Id should relax regex check for Id (puru via 
rohini)
 OOZIE-1915 Move system properties to conf properties (puru via rohini)
 OOZIE-1934 coordinator action repeatedly picked up by cachePurgeWorker of 
PartitionDependencyManagerService (ryota)
 OOZIE-1933 SLACalculatorMemory HA changes assume SLARegistrationBean exists 
for all jobs (mona)

Reply via email to