Author: virag
Date: Wed Mar  6 23:28:57 2013
New Revision: 1453618

URL: http://svn.apache.org/r1453618
Log:
OOZIE-1253 latest() gets resolved before all push dependencies are resolved 
(rohini via virag)

Modified:
    
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
    oozie/branches/branch-4.0/release-log.txt

Modified: 
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- 
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 (original)
+++ 
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 Wed Mar  6 23:28:57 2013
@@ -130,6 +130,13 @@ public class CoordActionInputCheckXComma
                     + nonResolvedList.toString());
             // Updating the list of data dependencies that are available and 
those that are yet not
             boolean status = checkInput(actionXml, existList, nonExistList, 
actionConf);
+            String pushDeps = coordAction.getPushMissingDependencies();
+            // Resolve latest/future only when all current missingDependencies 
and
+            // pushMissingDependencies are met
+            if (status) {
+                status = (pushDeps == null || pushDeps.length() == 0) ? 
checkUnResolvedInput(actionXml, actionConf)
+                        : false;
+            }
             coordAction.setLastModifiedTime(currentTime);
             coordAction.setActionXml(actionXml.toString());
             if (nonResolvedList.length() > 0 && status == false) {
@@ -141,8 +148,7 @@ public class CoordActionInputCheckXComma
                 isChangeInDependency = true;
                 coordAction.setMissingDependencies(nonExistListStr);
             }
-            String pushDeps = coordAction.getPushMissingDependencies();
-            if (status == true && (pushDeps == null || pushDeps.length() == 
0)) {
+            if (status) {
                 String newActionXml = resolveCoordConfiguration(actionXml, 
actionConf, actionId);
                 actionXml.replace(0, actionXml.length(), newActionXml);
                 coordAction.setActionXml(actionXml.toString());
@@ -242,11 +248,13 @@ public class CoordActionInputCheckXComma
     protected boolean checkInput(StringBuilder actionXml, StringBuilder 
existList, StringBuilder nonExistList,
             Configuration conf) throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml.toString());
-        boolean allExist = checkResolvedUris(eAction, existList, nonExistList, 
conf);
-        if (allExist) {
-            LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking 
Latest/future");
-            allExist = checkUnresolvedInstances(eAction, conf);
-        }
+        return checkResolvedUris(eAction, existList, nonExistList, conf);
+    }
+
+    private boolean checkUnResolvedInput(StringBuilder actionXml, 
Configuration conf) throws Exception {
+        Element eAction = XmlUtils.parseXml(actionXml.toString());
+        LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking 
Latest/future");
+        boolean allExist = checkUnresolvedInstances(eAction, conf);
         if (allExist) {
             actionXml.replace(0, actionXml.length(), 
XmlUtils.prettyPrint(eAction).toString());
         }

Modified: 
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- 
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
 (original)
+++ 
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
 Wed Mar  6 23:28:57 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
+import 
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
@@ -213,33 +214,22 @@ public class TestCoordActionInputCheckXC
         CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, 
endTime, "latest");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
 
-        CoordinatorActionBean action = null;
         JPAService jpaService = Services.get().get(JPAService.class);
-        try {
-            action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly 
in db");
-        }
-
+        CoordinatorActionBean action = jpaService
+                .execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + 
"${coord:latestRange(-3,0)}",
                 action.getMissingDependencies());
 
+        // Update action creation time
         String actionXML = action.getActionXml();
         String actionCreationTime = "2009-02-15T01:00" + TZ;
         actionXML = actionXML.replaceAll("action-actual-time=\".*\">", 
"action-actual-time=\"" + actionCreationTime
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
-        try {
-            jpaService.execute(new 
CoordActionUpdateForInputCheckJPAExecutor(action));
-            action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-            
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
 ;
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly 
in db");
-        }
+        jpaService.execute(new 
CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
 ;
 
         // providing some of the dataset dirs required as per coordinator 
specification with holes
         // before and after action creation time
@@ -253,13 +243,8 @@ public class TestCoordActionInputCheckXC
         new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
         //Sleep for sometime as it gets requeued with 10ms delay on failure to 
acquire write lock
         Thread.sleep(1000);
-        try {
-            action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly 
in db");
-        }
 
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         actionXML = action.getActionXml();
         assertEquals("", action.getMissingDependencies());
         // Datasets only before action creation/actual time should be picked 
up.
@@ -272,6 +257,81 @@ public class TestCoordActionInputCheckXC
         assertEquals(resolvedList, 
actionXML.substring(actionXML.indexOf("<uris>") + 6, 
actionXML.indexOf("</uris>")));
     }
 
+    public void 
testActionInputCheckLatestActionCreationTimeWithPushDependency() throws 
Exception {
+        
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
 false);
+
+        String jobId = "0000000-" + new Date().getTime() + 
"-TestCoordActionInputCheckXCommand-C";
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, 
endTime, "latest");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+        // Set push missing dependency
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService
+                .execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        final String pushMissingDependency = "file://" + getTestCaseDir() + 
"/2009/02/05";
+        action.setPushMissingDependencies(pushMissingDependency);
+        jpaService.execute(new 
CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+        // Update action creation time
+        String actionXML = action.getActionXml();
+        String actionCreationTime = "2009-02-15T01:00" + TZ;
+        actionXML = actionXML.replaceAll("action-actual-time=\".*\">", 
"action-actual-time=\"" + actionCreationTime
+                + "\">");
+        action.setActionXml(actionXML);
+        action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+        jpaService.execute(new 
CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
 ;
+
+        new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + 
"${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, 
action.getPushMissingDependencies());
+
+        // providing some of the dataset dirs required as per coordinator 
specification with holes
+        // before and after action creation time
+        createDir(getTestCaseDir() + "/2009/03/05/");
+        createDir(getTestCaseDir() + "/2009/02/19/");
+        createDir(getTestCaseDir() + "/2009/02/12/");
+        createDir(getTestCaseDir() + "/2009/01/22/");
+        createDir(getTestCaseDir() + "/2009/01/08/");
+        createDir(getTestCaseDir() + "/2009/01/01/");
+
+        // Run input check after making latest available
+        new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + 
"${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, 
action.getPushMissingDependencies());
+
+        // Run input check after making push dependencies available
+        createDir(getTestCaseDir() + "/2009/02/05");
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getPushMissingDependencies());
+        checkCoordAction(job.getId() + "@1", 
CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+                + "${coord:latestRange(-3,0)}", 
CoordinatorAction.Status.WAITING);
+        new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
+        //Sleep for sometime as it gets requeued with 10ms delay on failure to 
acquire write lock
+        Thread.sleep(1000);
+
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getMissingDependencies());
+        actionXML = action.getActionXml();
+        // Datasets only before action creation/actual time should be picked 
up.
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" + 
CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/05" + 
CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/01/22" + 
CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/01/08";
+        System.out.println("Expected: " + resolvedList);
+        System.out.println("Actual: " +  
actionXML.substring(actionXML.indexOf("<uris>") + 6, 
actionXML.indexOf("</uris>")));
+        assertEquals(resolvedList, 
actionXML.substring(actionXML.indexOf("<uris>") + 6, 
actionXML.indexOf("</uris>")));
+    }
+
     public void testActionInputCheckLatestCurrentTime() throws Exception {
         
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
 true);
 
@@ -281,33 +341,22 @@ public class TestCoordActionInputCheckXC
         CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, 
endTime, "latest");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
 
-        CoordinatorActionBean action = null;
         JPAService jpaService = Services.get().get(JPAService.class);
-        try {
-            action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly 
in db");
-        }
-
+        CoordinatorActionBean action = jpaService
+                .execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + 
"${coord:latestRange(-3,0)}",
                 action.getMissingDependencies());
 
+        // Update action creation time
         String actionXML = action.getActionXml();
         String actionCreationTime = "2009-02-15T01:00" + TZ;
         actionXML = actionXML.replaceAll("action-actual-time=\".*\">", 
"action-actual-time=\"" + actionCreationTime
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
-        try {
-            jpaService.execute(new 
CoordActionUpdateForInputCheckJPAExecutor(action));
-            action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-            
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
 ;
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly 
in db");
-        }
+        jpaService.execute(new 
CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
 ;
 
         // providing some of the dataset dirs required as per coordinator
         // specification with holes
@@ -322,13 +371,8 @@ public class TestCoordActionInputCheckXC
         new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
         //Sleep for sometime as it gets requeued with 10ms delay on failure to 
acquire write lock
         Thread.sleep(1000);
-        try {
-            action = jpaService.execute(new 
CoordActionGetJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly 
in db");
-        }
 
+        action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() 
+ "@1"));
         actionXML = action.getActionXml();
         assertEquals("", action.getMissingDependencies());
         // Datasets should be picked up based on current time and not action 
creation/actual time.
@@ -339,6 +383,81 @@ public class TestCoordActionInputCheckXC
         assertEquals(resolvedList, 
actionXML.substring(actionXML.indexOf("<uris>") + 6, 
actionXML.indexOf("</uris>")));
     }
 
+    public void testActionInputCheckLatestCurrentTimeWithPushDependency() 
throws Exception {
+        
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
 true);
+
+        String jobId = "0000000-" + new Date().getTime() + 
"-TestCoordActionInputCheckXCommand-C";
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, 
endTime, "latest");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+        // Set push missing dependency
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService
+                .execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        final String pushMissingDependency = "file://" + getTestCaseDir() + 
"/2009/02/05";
+        action.setPushMissingDependencies(pushMissingDependency);
+        jpaService.execute(new 
CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+        // Update action creation time
+        String actionXML = action.getActionXml();
+        String actionCreationTime = "2009-02-15T01:00" + TZ;
+        actionXML = actionXML.replaceAll("action-actual-time=\".*\">", 
"action-actual-time=\"" + actionCreationTime
+                + "\">");
+        action.setActionXml(actionXML);
+        action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+        jpaService.execute(new 
CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
 ;
+
+        new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + 
"${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, 
action.getPushMissingDependencies());
+
+        // providing some of the dataset dirs required as per coordinator 
specification with holes
+        // before and after action creation time
+        createDir(getTestCaseDir() + "/2009/03/05/");
+        createDir(getTestCaseDir() + "/2009/02/19/");
+        createDir(getTestCaseDir() + "/2009/02/12/");
+        createDir(getTestCaseDir() + "/2009/01/22/");
+        createDir(getTestCaseDir() + "/2009/01/08/");
+        createDir(getTestCaseDir() + "/2009/01/01/");
+
+        // Run input check after making latest available
+        new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + 
"${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, 
action.getPushMissingDependencies());
+
+        // Run input check after making push dependencies available
+        createDir(getTestCaseDir() + "/2009/02/05");
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getPushMissingDependencies());
+        checkCoordAction(job.getId() + "@1", 
CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+                + "${coord:latestRange(-3,0)}", 
CoordinatorAction.Status.WAITING);
+        new CoordActionInputCheckXCommand(job.getId() + "@1", 
job.getId()).call();
+        //Sleep for sometime as it gets requeued with 10ms delay on failure to 
acquire write lock
+        Thread.sleep(1000);
+
+        action = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getMissingDependencies());
+        actionXML = action.getActionXml();
+        // Datasets should be picked up based on current time and not action 
creation/actual time.
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/03/05" + 
CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/19" + 
CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/12" + 
CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/05";
+        System.out.println("Expected: " + resolvedList);
+        System.out.println("Actual: " +  
actionXML.substring(actionXML.indexOf("<uris>") + 6, 
actionXML.indexOf("</uris>")));
+        assertEquals(resolvedList, 
actionXML.substring(actionXML.indexOf("<uris>") + 6, 
actionXML.indexOf("</uris>")));
+    }
+
     public void testActionInputCheckFuture() throws Exception {
         String jobId = "0000000-" + new Date().getTime() + 
"-TestCoordActionInputCheckXCommand-C";
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);

Modified: oozie/branches/branch-4.0/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Wed Mar  6 23:28:57 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.0.0 (unreleased)
 
+OOZIE-1253 latest() gets resolved before all push dependencies are resolved 
(rohini via virag)
 OOZIE-1251 Log messages for DependencyChecker class show wrong jobid and 
actionid (rohini via mona)
 OOZIE-1218 Create a HCatalog Integration Guide (rohini via virag)
 OOZIE-1250 Coord action timeout not happening when there is a exception 
(rohini via mona)


Reply via email to