Author: virag
Date: Tue Mar 12 01:21:17 2013
New Revision: 1455384
URL: http://svn.apache.org/r1455384
Log:
OOZIE-1261 Registered push dependencies are not removed on Coord Kill command
(virag)
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsNotCompletedJPAExecutor.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
oozie/branches/branch-4.0/release-log.txt
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1455384&r1=1455383&r2=1455384&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
Tue Mar 12 01:21:17 2013
@@ -105,7 +105,7 @@ import org.apache.openjpa.persistence.jd
// Query to maintain backward compatibility for coord job info command
@NamedQuery(name =
"GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by
a.nominalTimestamp"),
// Query to retrieve action id, action status, pending status and
external Id of not completed Coordinator actions
- @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select
a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where
a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND
a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select
a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies from
CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND
a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
// Query to retrieve action id, action status, pending status and
external Id of running Coordinator actions
@NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id,
a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId =
:jobId and a.status = 'RUNNING'"),
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1455384&r1=1455383&r2=1455384&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
Tue Mar 12 01:21:17 2013
@@ -28,6 +28,7 @@ import org.apache.oozie.command.wf.KillX
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.KillTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.dependency.DependencyChecker;
import
org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
@@ -38,6 +39,7 @@ import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.StatusUtils;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
@@ -132,6 +134,11 @@ public class CoordKillXCommand extends K
LOG.debug("Killed coord action = [{0}], current status =
[{1}], pending = [{2}]",
action.getId(), action.getStatus(),
action.getPending());
}
+ String pushMissingDeps = action.getPushMissingDependencies();
+ if (pushMissingDeps != null) {
+
CoordPushDependencyCheckXCommand.unregisterMissingDependencies(
+
Arrays.asList(DependencyChecker.dependenciesAsArray(pushMissingDeps)),
action.getId());
+ }
}
}
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1455384&r1=1455383&r2=1455384&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Tue Mar 12 01:21:17 2013
@@ -49,6 +49,7 @@ import org.apache.oozie.service.URIHandl
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
public class CoordPushDependencyCheckXCommand extends
CoordinatorXCommand<Void> {
protected String actionId;
@@ -144,7 +145,7 @@ public class CoordPushDependencyCheckXCo
unregisterAvailableDependencies(actionDep);
}
if (timeout) {
-
unregisterMissingDependencies(actionDep.getMissingDependencies());
+
unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
}
}
catch (Exception e) {
@@ -152,7 +153,7 @@ public class CoordPushDependencyCheckXCo
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
Services.get().get(CallableQueueService.class).queue(new
CoordActionTimeOutXCommand(coordAction));
-
unregisterMissingDependencies(Arrays.asList(missingDepsArray));
+
unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
@@ -274,7 +275,8 @@ public class CoordPushDependencyCheckXCo
}
}
- private void unregisterMissingDependencies(List<String> missingDeps) {
+ public static void unregisterMissingDependencies(List<String> missingDeps,
String actionId) {
+ final XLog LOG = XLog.getLog(CoordPushDependencyCheckXCommand.class);
URIHandlerService uriService =
Services.get().get(URIHandlerService.class);
for (String missingDep : missingDeps) {
try {
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsNotCompletedJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsNotCompletedJPAExecutor.java?rev=1455384&r1=1455383&r2=1455384&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsNotCompletedJPAExecutor.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsNotCompletedJPAExecutor.java
Tue Mar 12 01:21:17 2013
@@ -90,6 +90,9 @@ public class CoordJobGetActionsNotComple
if (arr[3] != null) {
bean.setExternalId((String) arr[3]);
}
+ if (arr[4] != null) {
+ bean.setPushMissingDependencies((String) arr[4]);
+ }
return bean;
}
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java?rev=1455384&r1=1455383&r2=1455384&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordKillXCommand.java
Tue Mar 12 01:21:17 2013
@@ -29,16 +29,22 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.HCatURIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StatusTransitService;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.HCatURI;
import org.apache.oozie.workflow.WorkflowInstance;
public class TestCoordKillXCommand extends XDataTestCase {
@@ -333,4 +339,53 @@ public class TestCoordKillXCommand exten
assertTrue(callable3.executed == 0);
}
+
+ public void testCoordKillRemovePushMissingDeps() throws Exception {
+ try {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ services.init();
+ String db = "default";
+ String table = "tablename";
+ String server = "hcatserver";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String pushMissingDeps = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+
+ CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", null, pushMissingDeps,
+ "Z");
+
+ String newHCatDependency3 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=russia";
+ CoordinatorActionBean action2 =
addRecordToCoordActionTableForWaiting(job.getId(), 2,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", null,
+ newHCatDependency3, "Z");
+
+ HCatURI hcatURI1, hcatURI2, hcatURI3;
+ hcatURI1 = new HCatURI(newHCatDependency1);
+ hcatURI2 = new HCatURI(newHCatDependency2);
+ hcatURI3 = new HCatURI(newHCatDependency3);
+
+ pdms.addMissingDependency(hcatURI1, action1.getId());
+ pdms.addMissingDependency(hcatURI2, action1.getId());
+ pdms.addMissingDependency(hcatURI3, action2.getId());
+ assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency1)).contains(action1.getId()));
+ assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency2)).contains(action1.getId()));
+ assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency3)).contains(action2.getId()));
+ new CoordKillXCommand(job.getId()).call();
+ assertNull(pdms.getWaitingActions(new
HCatURI(newHCatDependency1)));
+ assertNull(pdms.getWaitingActions(new
HCatURI(newHCatDependency2)));
+ assertNull(pdms.getWaitingActions(new
HCatURI(newHCatDependency3)));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ }
+
}
Modified: oozie/branches/branch-4.0/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1455384&r1=1455383&r2=1455384&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Tue Mar 12 01:21:17 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 (unreleased)
+OOZIE-1261 Registered push dependencies are not removed on Coord Kill command
(virag)
OOZIE-1191 add examples of coordinator with SLA tag inserted (ryota via mona)
OOZIE-1204 Illustrate correct use of parameters inside SLA tags (jun aoki via
mona)
OOZIE-1255 latest/future check for hcat can cause shutdown to hang (rohini via
virag)