OOZIE-2770 Show missing dependencies for coord actions
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9035d91c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9035d91c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9035d91c Branch: refs/heads/oya Commit: 9035d91cf352ad2e3328feef6e04f067e8e2bc8c Parents: ff21b99 Author: puru <[email protected]> Authored: Sun Jan 29 19:07:39 2017 -0800 Committer: puru <[email protected]> Committed: Sun Jan 29 19:07:39 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 19 ++ .../org/apache/oozie/client/OozieClient.java | 100 ++++++- .../org/apache/oozie/client/rest/JsonTags.java | 5 + .../apache/oozie/client/rest/RestConstants.java | 2 + .../org/apache/oozie/CoordinatorEngine.java | 14 + .../main/java/org/apache/oozie/ErrorCode.java | 3 +- .../oozie/command/SchemaCheckXCommand.java | 4 +- .../CoordActionMissingDependenciesXCommand.java | 132 +++++++++ .../oozie/command/coord/CoordCommandUtils.java | 20 +- .../command/coord/CoordSLAAlertsXCommand.java | 15 +- .../java/org/apache/oozie/coord/CoordUtils.java | 30 +- .../AbstractCoordInputDependency.java | 15 +- .../input/dependency/CoordInputDependency.java | 20 ++ .../dependency/CoordOldInputDependency.java | 125 +++++++- .../dependency/CoordPullInputDependency.java | 19 ++ .../oozie/dependency/ActionDependency.java | 5 + .../executor/jpa/CoordActionQueryExecutor.java | 22 +- .../apache/oozie/servlet/BaseJobServlet.java | 21 +- .../org/apache/oozie/servlet/V0JobServlet.java | 26 +- .../org/apache/oozie/servlet/V1JobServlet.java | 29 +- .../org/apache/oozie/servlet/V2JobServlet.java | 44 +++ .../apache/oozie/sla/SLACalculatorMemory.java | 2 +- .../main/java/org/apache/oozie/util/Pair.java | 6 +- .../oozie/TestCoordinatorEngineSimple.java | 4 +- .../org/apache/oozie/client/TestOozieCLI.java | 16 ++ ...tCoordActionMissingDependenciesXCommand.java | 288 +++++++++++++++++++ .../input/logic/TestCoordInputLogicPush.java | 59 ++-- .../servlet/MockCoordinatorEngineService.java | 10 + .../coord-multiple-output-instance5.xml | 108 +++++++ docs/src/site/twiki/DG_CommandLineTool.twiki | 25 ++ docs/src/site/twiki/WebServicesAPI.twiki | 49 ++++ release-log.txt | 1 + webapp/src/main/webapp/oozie-console.js | 125 +++++++- 33 files changed, 1261 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index 6e30d7e..f78c3aa 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -181,6 +181,8 @@ public class OozieCLI { public static final String WORKFLOW_ACTIONS_RETRIES = "retries"; + public static final String COORD_ACTION_MISSING_DEPENDENCIES = "missingdeps"; + private static final String[] OOZIE_HELP = { "the env variable '" + ENV_OOZIE_URL + "' is used as default value for the '-" + OOZIE_OPTION + "' option", "the env variable '" + ENV_OOZIE_TIME_ZONE + "' is used as default value for the '-" + TIME_ZONE_OPTION + "' option", @@ -382,6 +384,9 @@ public class OozieCLI { "enables sla alerts for the job and its children"); Option slaChange = new Option(SLA_CHANGE, true, "Update sla param for jobs, supported param are should-start, should-end, nominal-time and max-duration"); + Option coordActionMissingDependencies = new Option(COORD_ACTION_MISSING_DEPENDENCIES, true, + "List missing dependencies of a coord action. To specify multiple actions, use with -action or -date option."); + Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user"); @@ -411,6 +416,7 @@ public class OozieCLI { actions.addOption(slaEnableAlert); actions.addOption(slaChange); actions.addOption(workflowActionRetries); + actions.addOption(coordActionMissingDependencies); actions.setRequired(true); Options jobOptions = new Options(); jobOptions.addOption(oozie); @@ -1329,6 +1335,19 @@ public class OozieCLI { wc.getWorkflowActionRetriesInfo(commandLine.getOptionValue(WORKFLOW_ACTIONS_RETRIES)), commandLine.getOptionValue(WORKFLOW_ACTIONS_RETRIES)); } + else if (options.contains(COORD_ACTION_MISSING_DEPENDENCIES)) { + String actions = null, dates = null; + + if (options.contains(ACTION_OPTION)) { + actions = commandLine.getOptionValue(ACTION_OPTION); + } + + if (options.contains(DATE_OPTION)) { + dates = commandLine.getOptionValue(DATE_OPTION); + } + wc.getCoordActionMissingDependencies(commandLine.getOptionValue(COORD_ACTION_MISSING_DEPENDENCIES), + actions, dates, System.out); + } } catch (OozieClientException ex) { http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index a107c4a..84f41c9 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -35,6 +35,7 @@ import javax.xml.transform.Transformer; import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; + import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -47,6 +48,7 @@ import java.io.Reader; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -592,6 +594,7 @@ public class OozieClient { } protected abstract T call(HttpURLConnection conn) throws IOException, OozieClientException; + } protected abstract class MapClientCallable extends ClientCallable<Map<String, String>> { @@ -1231,6 +1234,99 @@ public class OozieClient { return new JobDefinition(jobId).call(); } + /** + * Get coord action missing dependencies + * @param jobId + * @param actionList + * @param dates + * @throws OozieClientException + */ + public void getCoordActionMissingDependencies(String jobId, String actionList, String dates, PrintStream ps) + throws OozieClientException { + new CoordActionMissingDependencies(jobId, actionList, dates, ps).call(); + } + + /** + * Get coord action missing dependencies + * @param jobId + * @param actionList + * @param dates + * @throws OozieClientException + */ + public void getCoordActionMissingDependencies(String jobId, String actionList, String dates) + throws OozieClientException { + new CoordActionMissingDependencies(jobId, actionList, dates).call(); + } + + private class CoordActionMissingDependencies extends ClientCallable<String> { + PrintStream printStream; + public CoordActionMissingDependencies(String jobId, String actionList, String dates, PrintStream ps) { + super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM, + RestConstants.COORD_ACTION_MISSING_DEPENDENCIES, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, actionList, + RestConstants.JOB_COORD_SCOPE_DATE, dates)); + this.printStream = ps; + } + + public CoordActionMissingDependencies(String jobId, String actionList, String dates) { + this(jobId, actionList, dates, System.out); + } + + + @SuppressWarnings("unchecked") + protected String call(HttpURLConnection conn) throws IOException, OozieClientException { + if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { + Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8); + JSONObject json = (JSONObject) JSONValue.parse(reader); + if (json != null) { + JSONArray inputDependencies = (JSONArray) json.get(JsonTags.COORD_ACTION_MISSING_DEPENDENCIES); + for (Object dependencies : inputDependencies) { + printStream.println(); + printStream.println("CoordAction : " + + ((JSONObject) dependencies).get(JsonTags.COORDINATOR_ACTION_ID)); + if (((JSONObject) dependencies).get(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES) != null) { + printStream.println("Blocked on : " + + ((JSONObject) dependencies) + .get(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES)); + } + + if (((JSONObject) dependencies).get(JsonTags.COORDINATOR_ACTION_DATASETS) != null) { + + JSONArray missingDependencies = (JSONArray) ((JSONObject) dependencies) + .get(JsonTags.COORDINATOR_ACTION_DATASETS); + + for (Object missingDependenciesJson : missingDependencies) { + + printStream.println("Dataset : " + + ((JSONObject) missingDependenciesJson) + .get(JsonTags.COORDINATOR_ACTION_DATASET)); + + JSONArray inputDependenciesList = (JSONArray) ((JSONObject) missingDependenciesJson) + .get(JsonTags.COORDINATOR_ACTION_MISSING_DEPS); + printStream.println("Pending Dependencies : "); + + if (inputDependenciesList != null) { + Iterator<String> iterator = inputDependenciesList.iterator(); + while (iterator.hasNext()) { + printStream.println("\t " + iterator.next()); + } + } + printStream.println(); + } + } + } + } + else { + printStream.println(" No missing input dependencies found"); + } + + } + else { + handleError(conn); + } + return null; + } + } + private class JobDefinition extends JobMetadata { JobDefinition(String jobId) { @@ -1268,7 +1364,7 @@ public class OozieClient { InputStreamReader isr = new InputStreamReader(is); try { if (printStream != null) { - sendToOutputStream(isr, -1); + sendToOutputStream(isr, -1, printStream); } else { returnVal = getReaderAsString(isr, -1); @@ -1291,7 +1387,7 @@ public class OozieClient { * @param maxLen max content length allowed, if -1 there is no limit. * @throws IOException */ - private void sendToOutputStream(Reader reader, int maxLen) throws IOException { + private void sendToOutputStream(Reader reader, int maxLen, PrintStream printStream) throws IOException { notNull(reader, "reader"); StringBuilder sb = new StringBuilder(); char[] buffer = new char[2048]; http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java index d670142..8f220b8 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java +++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java @@ -140,6 +140,8 @@ public interface JsonTags { String COORDINATOR_ACTIONS = "actions"; String COORDINATOR_ACTION_DATA = "data"; String COORDINATOR_JOB_DATA = "data"; + String COORDINATOR_ACTION_DATASETS = "dataSets"; + String COORDINATOR_ACTION_DATASET = "dataSet"; String BUNDLE_JOB_ID = "bundleJobId"; String BUNDLE_JOB_NAME = "bundleJobName"; @@ -246,5 +248,8 @@ public interface JsonTags { String STATUS = "status"; String ACTION_ATTEMPT = "attempt"; String VALIDATE = "validate"; + String COORD_ACTION_MISSING_DEPENDENCIES = "missingDependencies"; + String COORD_ACTION_FIRST_MISSING_DEPENDENCIES = "blockedOn"; + } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java index 9a3be97..4e38b4a 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java +++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java @@ -217,4 +217,6 @@ public interface RestConstants { String USER_PARAM = "user"; + public static final String COORD_ACTION_MISSING_DEPENDENCIES = "missing-dependencies"; + } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 91fe5a1..2f9f822 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -49,6 +49,7 @@ import org.apache.oozie.command.coord.CoordActionInfoXCommand; import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand; import org.apache.oozie.command.coord.CoordActionsKillXCommand; import org.apache.oozie.command.coord.CoordChangeXCommand; +import org.apache.oozie.command.coord.CoordActionMissingDependenciesXCommand; import org.apache.oozie.command.coord.CoordJobXCommand; import org.apache.oozie.command.coord.CoordJobsXCommand; import org.apache.oozie.command.coord.CoordKillXCommand; @@ -60,6 +61,7 @@ import org.apache.oozie.command.coord.CoordSLAChangeXCommand; import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.coord.CoordSuspendXCommand; import org.apache.oozie.command.coord.CoordUpdateXCommand; +import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -984,4 +986,16 @@ public class CoordinatorEngine extends BaseEngine { throw new CoordinatorEngineException(ex); } } + /** + * Get coord action missing dependencies + * @param id jobID + * @param actions action list + * @param dates nominal time list + * @return pair of coord action bean and list of missing input dependencies. + * @throws CommandException + */ + public List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> getCoordActionMissingDependencies(String id, + String actions, String dates) throws CommandException { + return new CoordActionMissingDependenciesXCommand(id, actions, dates).call(); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 9a843bd..ed1ca60 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -217,8 +217,7 @@ public enum ErrorCode { E1026(XLog.STD, "SLA alert update command failed: {0}"), E1027(XLog.STD, "SLA change command failed. {0}"), E1028(XLog.STD, "Coord input logic error. {0}"), - - + E1029(XLog.STD, "Coord action missing dependencies error. {0}"), E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"), http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java index 1cc086e..d1ea294 100644 --- a/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java @@ -195,10 +195,10 @@ public class SchemaCheckXCommand extends XCommand<Void> { problem = true; } else { for (Map.Entry<String, Integer> ent : expectedColumnTypes.entrySet()) { - if (!foundColumns.get(ent.getKey()).getFist().equals(ent.getValue())) { + if (!foundColumns.get(ent.getKey()).getFirst().equals(ent.getValue())) { LOG.error("Expected column [{0}] in table [{1}] to have type [{2}], but found type [{3}]", ent.getKey(), table, getSQLTypeFromInt(ent.getValue()), - getSQLTypeFromInt(foundColumns.get(ent.getKey()).getFist())); + getSQLTypeFromInt(foundColumns.get(ent.getKey()).getFirst())); problem = true; } else if (foundColumns.get(ent.getKey()).getSecond() != null) { LOG.error("Expected column [{0}] in table [{1}] to have default value [NULL], but found default vale [{2}]", http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/command/coord/CoordActionMissingDependenciesXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMissingDependenciesXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionMissingDependenciesXCommand.java new file mode 100644 index 0000000..d37cfe5 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionMissingDependenciesXCommand.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.coord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.XCommand; +import org.apache.oozie.coord.CoordUtils; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; +import org.apache.oozie.dependency.ActionDependency; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.util.Pair; + +public class CoordActionMissingDependenciesXCommand + extends XCommand<List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>>> { + + private String actions; + private String dates; + private String jobId; + private List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); + + public CoordActionMissingDependenciesXCommand(String id, String actions, String dates) { + super("CoordActionMissingDependenciesXCommand", "CoordActionMissingDependenciesXCommand", 1); + this.jobId = id; + this.actions = actions; + this.dates = dates; + + if (id.contains("@")) { + this.jobId = id.substring(0, id.indexOf("@")); + this.actions = id.substring(id.indexOf("@") + 1); + } + } + + public CoordActionMissingDependenciesXCommand(String id) { + this(id, null, null); + } + + @Override + protected boolean isLockRequired() { + return false; + } + + @Override + public String getEntityKey() { + return null; + } + + @Override + protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { + if (actions == null && dates == null) { + throw new CommandException(ErrorCode.E1029, "Action(s) are missing."); + } + } + + @Override + protected void loadState() throws CommandException { + String actionId = null; + + try { + List<String> actionIds = CoordUtils.getActionListForScopeAndDate(jobId, actions, dates); + for (String id : actionIds) { + actionId = id; + coordActions.add(CoordActionQueryExecutor.getInstance() + .get(CoordActionQuery.GET_COORD_ACTION_FOR_INPUTCHECK, actionId)); + } + } + catch (JPAExecutorException e) { + if (e.getErrorCode().equals(ErrorCode.E0605)) { + throw new CommandException(ErrorCode.E0605, actionId); + } + else { + throw new CommandException(ErrorCode.E1029, e); + } + } + + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + + } + + @Override + protected List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> execute() throws CommandException { + + List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> inputDependenciesListPair = + new ArrayList<Pair<CoordinatorActionBean, Map<String, ActionDependency>>>(); + try { + + for (CoordinatorActionBean coordAction : coordActions) { + CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); + CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); + Map<String, ActionDependency> dependencyMap = new HashMap<String, ActionDependency>(); + dependencyMap.putAll(coordPullInputDependency.getMissingDependencies(coordAction)); + dependencyMap.putAll(coordPushInputDependency.getMissingDependencies(coordAction)); + + inputDependenciesListPair.add( + new Pair<CoordinatorActionBean, Map<String, ActionDependency>>(coordAction, dependencyMap)); + } + } + catch (Exception e) { + throw new CommandException(ErrorCode.E1028, e.getMessage(), e); + } + + return inputDependenciesListPair; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java index 63287b9..3a7a930 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java @@ -25,8 +25,6 @@ import java.net.URISyntaxException; import java.text.ParseException; import java.util.ArrayList; import java.util.TimeZone; -import java.util.Map; -import java.util.HashMap; import java.util.List; import java.util.Date; import java.util.Calendar; @@ -58,6 +56,7 @@ import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.service.UUIDService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; @@ -737,7 +736,7 @@ public class CoordCommandUtils { .createPullInputDependencies(isInputLogicSpecified); CoordInputDependency coordPushInputDependency = CoordInputDependencyFactory .createPushInputDependencies(isInputLogicSpecified); - Map<String, String> unresolvedList = new HashMap<String, String>(); + List<Pair<String, String>> unresolvedList = new ArrayList<Pair<String, String>>(); URIHandlerService uriService = Services.get().get(URIHandlerService.class); @@ -776,11 +775,11 @@ public class CoordCommandUtils { String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INSTANCES_TAG, event.getNamespace()); if (tmpUnresolved != null) { - unresolvedList.put(name, tmpUnresolved); + unresolvedList.add(new Pair<String,String>(name, tmpUnresolved)); } } - for(String unresolvedDatasetName:unresolvedList.keySet()){ - coordPullInputDependency.addUnResolvedList(unresolvedDatasetName, unresolvedList.get(unresolvedDatasetName)); + for (Pair<String, String> unresolvedDataset : unresolvedList) { + coordPullInputDependency.addUnResolvedList(unresolvedDataset.getFirst(), unresolvedDataset.getSecond()); } actionBean.setPullInputDependencies(coordPullInputDependency); actionBean.setPushInputDependencies(coordPushInputDependency); @@ -946,4 +945,13 @@ public class CoordCommandUtils { return pathExists(sPath, actionConf, user); } + public static String getFirstMissingDependency(CoordinatorActionBean coordAction) { + CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); + CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); + String firstMissingDependencies = coordPullInputDependency.getFirstMissingDependency(); + if (StringUtils.isEmpty(firstMissingDependencies)) { + firstMissingDependencies = coordPushInputDependency.getFirstMissingDependency(); + } + return firstMissingDependencies; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java index b8affd6..96be7ad 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java @@ -170,23 +170,10 @@ public abstract class CoordSLAAlertsXCommand extends SLAAlertsXCommand { * @throws CommandException the command exception */ private List<String> getActionListForScopeAndDate(String id, String scope, String dates) throws CommandException { - List<String> actionIds = new ArrayList<String>(); - if (scope == null && dates == null) { return null; } - List<String> parsed = new ArrayList<String>(); - if (dates != null) { - List<CoordinatorActionBean> actionSet = CoordUtils.getCoordActionsFromDates(id, dates, true); - for (CoordinatorActionBean action : actionSet) { - actionIds.add(action.getId()); - } - parsed.addAll(actionIds); - } - if (scope != null) { - parsed.addAll(CoordUtils.getActionsIds(id, scope)); - } - return parsed; + return CoordUtils.getActionListForScopeAndDate(id, scope, dates); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/coord/CoordUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java index 82f9bed..5425d86 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.Map; @@ -100,6 +101,29 @@ public class CoordUtils { return coordActions; } + public static List<String> getActionListForScopeAndDate(String id, String scope, String dates) throws CommandException { + List<String> actionIds = new ArrayList<String>(); + + List<String> parsed = new ArrayList<String>(); + if (scope == null && dates == null) { + parsed.add(id); + return parsed; + } + + if (dates != null) { + List<CoordinatorActionBean> actionSet = CoordUtils.getCoordActionsFromDates(id, dates, true); + for (CoordinatorActionBean action : actionSet) { + actionIds.add(action.getId()); + } + parsed.addAll(actionIds); + } + if (scope != null) { + parsed.addAll(CoordUtils.getActionsIds(id, scope)); + } + return parsed; + } + + /** * Get the list of actions for given date ranges * @@ -115,7 +139,7 @@ public class CoordUtils { ParamChecker.notEmpty(jobId, "jobId"); ParamChecker.notEmpty(scope, "scope"); - Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>(); + Set<CoordinatorActionBean> actionSet = new LinkedHashSet<CoordinatorActionBean>(); String[] list = scope.split(","); for (String s : list) { s = s.trim(); @@ -169,7 +193,7 @@ public class CoordUtils { ParamChecker.notEmpty(jobId, "jobId"); ParamChecker.notEmpty(scope, "scope"); - Set<String> actions = new HashSet<String>(); + Set<String> actions = new LinkedHashSet<String>(); String[] list = scope.split(","); for (String s : list) { s = s.trim(); @@ -360,7 +384,7 @@ public class CoordUtils { Map<String, Object> params = new HashMap<String, Object>(); int pcnt= 1; for (Map.Entry<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) { - String field = filter.getKey().getFist(); + String field = filter.getKey().getFirst(); CoordinatorEngine.FILTER_COMPARATORS comp = filter.getKey().getSecond(); String sqlField; if (field.equals(OozieClient.FILTER_STATUS)) { http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java index 0da60ec..ace120d 100644 --- a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java @@ -294,7 +294,6 @@ public abstract class AbstractCoordInputDependency implements Writable, CoordInp WritableUtils.writeStringAsBytes(out,INTERNAL_VERSION_ID); out.writeBoolean(isDependencyMet); WritableUtils.writeMapWithList(out, dependencyMap); - } @Override @@ -312,4 +311,18 @@ public abstract class AbstractCoordInputDependency implements Writable, CoordInp return getAvailableDependencies(dataSet).size() == getDependencyMap().get(dataSet).size(); } + @Override + public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction) + throws CommandException, IOException, JDOMException { + Map<String, ActionDependency> missingDependenciesMap = new HashMap<String, ActionDependency>(); + for (String key : missingDependenciesSet.keySet()) { + missingDependenciesMap.put(key, new ActionDependency(missingDependenciesSet.get(key), new ArrayList<String>())); + } + return missingDependenciesMap; + } + + public String getFirstMissingDependency() { + return null; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java index 504bc3d..4a2a0b0 100644 --- a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java @@ -21,6 +21,7 @@ package org.apache.oozie.coord.input.dependency; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.command.CommandException; @@ -134,6 +135,25 @@ public interface CoordInputDependency { boolean registerForNotification) throws CommandException, IOException, JDOMException; /** + * Gets the missing dependencies. + * + * @param coordAction the coord action + * @return the missing dependencies + * @throws CommandException the command exception + * @throws IOException Signals that an I/O exception has occurred. + * @throws JDOMException the JDOM exception + */ + public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction) + throws CommandException, IOException, JDOMException; + + /** + * Gets the first missing dependency. + * + * @return the first missing dependency + */ + public String getFirstMissingDependency(); + + /** * Check pull missing dependencies. * * @param coordAction the coord action http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java index aabd2bf..b0b6475 100644 --- a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java @@ -25,7 +25,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -38,9 +43,13 @@ import org.apache.oozie.command.coord.CoordCommandUtils; import org.apache.oozie.coord.CoordELConstants; import org.apache.oozie.coord.CoordELEvaluator; import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.coord.CoordUtils; import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.dependency.DependencyChecker; +import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.dependency.URIHandlerException; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.ParamChecker; @@ -52,7 +61,6 @@ import org.jdom.JDOMException; /** * Old approach where dependencies are stored as String. - * */ public class CoordOldInputDependency implements CoordInputDependency { @@ -153,8 +161,8 @@ public class CoordOldInputDependency implements CoordInputDependency { if (StringUtils.isEmpty(missingDependencies)) { return; } - List<String> missingDependenciesList = new ArrayList<String>(Arrays.asList((DependencyChecker - .dependenciesAsArray(missingDependencies)))); + List<String> missingDependenciesList = new ArrayList<String>( + Arrays.asList((DependencyChecker.dependenciesAsArray(missingDependencies)))); missingDependenciesList.removeAll(availableList); missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList); @@ -178,8 +186,8 @@ public class CoordOldInputDependency implements CoordInputDependency { public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, boolean registerForNotification) throws CommandException, IOException { - return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), new XConfiguration( - new StringReader(coordAction.getRunConf())), !registerForNotification); + return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), + new XConfiguration(new StringReader(coordAction.getRunConf())), !registerForNotification); } private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList, @@ -252,13 +260,12 @@ public class CoordOldInputDependency implements CoordInputDependency { } @SuppressWarnings("unchecked") - public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) - throws Exception { + public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) throws Exception { Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); String actualTimeStr = eAction.getAttributeValue("action-actual-time"); Element inputList = eAction.getChild("input-events", eAction.getNamespace()); - if(inputList==null){ + if (inputList == null) { return true; } @@ -279,8 +286,8 @@ public class CoordOldInputDependency implements CoordInputDependency { continue; } ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf); - String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, - dEvent.getNamespace()).getTextTrim(); + String unResolvedInstance = dEvent + .getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()).getTextTrim(); String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); StringBuffer resolvedTmp = new StringBuffer(); for (int i = 0; i < unresolvedList.length; i++) { @@ -297,8 +304,8 @@ public class CoordOldInputDependency implements CoordInputDependency { } if (resolvedTmp.length() > 0) { if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { - resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( - dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); + resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR) + .append(dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); dEvent.removeChild("uris", dEvent.getNamespace()); } Element uriInstance = new Element("uris", dEvent.getNamespace()); @@ -312,4 +319,98 @@ public class CoordOldInputDependency implements CoordInputDependency { return true; } + public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction) + throws CommandException, IOException, JDOMException { + + Map<String, ActionDependency> dependenciesMap = null; + try { + dependenciesMap = getDependency(coordAction); + } + catch (URIHandlerException e) { + throw new IOException(e); + } + + StringBuilder nonExistList = new StringBuilder(); + StringBuilder nonResolvedList = new StringBuilder(); + CoordCommandUtils.getResolvedList(getMissingDependencies(), nonExistList, nonResolvedList); + + Set<String> missingSets = new HashSet<String>( + Arrays.asList(nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR))); + + missingSets.addAll( + Arrays.asList(nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR))); + + for (Iterator<Map.Entry<String, ActionDependency>> it = dependenciesMap.entrySet().iterator(); it.hasNext();) { + Map.Entry<String, ActionDependency> entry = it.next(); + ActionDependency dependency = entry.getValue(); + dependency.getMissingDependencies().retainAll(missingSets); + if (dependenciesMap.get(entry.getKey()).getMissingDependencies().isEmpty()) { + it.remove(); + } + } + return dependenciesMap; + } + + @SuppressWarnings("unchecked") + private Map<String, ActionDependency> getDependency(CoordinatorActionBean coordAction) + throws JDOMException, URIHandlerException { + Map<String, ActionDependency> dependenciesMap = new HashMap<String, ActionDependency>(); + URIHandlerService uriService = Services.get().get(URIHandlerService.class); + + Element eAction = XmlUtils.parseXml(coordAction.getActionXml()); + Element inputList = eAction.getChild("input-events", eAction.getNamespace()); + List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); + for (Element event : eDataEvents) { + Element uri = event.getChild("uris", event.getNamespace()); + ActionDependency dependency = new ActionDependency(); + if (uri != null) { + Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag", + event.getNamespace()); + String[] dataSets = uri.getText().split(CoordELFunctions.INSTANCE_SEPARATOR); + String doneFlag = CoordUtils.getDoneFlag(doneFlagElement); + + for (String dataSet : dataSets) { + URIHandler uriHandler; + uriHandler = uriService.getURIHandler(dataSet); + dependency.getMissingDependencies().add(uriHandler.getURIWithDoneFlag(dataSet, doneFlag)); + } + } + if (event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace()) != null) { + dependency.getMissingDependencies() + .addAll(getUnResolvedDependency(coordAction, event).getMissingDependencies()); + } + dependenciesMap.put(event.getAttributeValue("name"), dependency); + } + return dependenciesMap; + } + + private ActionDependency getUnResolvedDependency(CoordinatorActionBean coordAction, Element event) + throws JDOMException, URIHandlerException { + String tmpUnresolved = event.getChildTextTrim(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, event.getNamespace()); + ActionDependency dependency = new ActionDependency(); + StringBuilder nonResolvedList = new StringBuilder(); + CoordCommandUtils.getResolvedList(getMissingDependencies(), new StringBuilder(), nonResolvedList); + if (nonResolvedList.length() > 0) { + dependency.getMissingDependencies().add(tmpUnresolved); + } + return dependency; + } + + @Override + public String getFirstMissingDependency() { + StringBuilder nonExistList = new StringBuilder(); + String missingDependencies = getMissingDependencies(); + StringBuilder nonResolvedList = new StringBuilder(); + CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList); + String firstMissingDependency = ""; + if (nonExistList.length() > 0) { + firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; + } + else { + if (nonResolvedList.length() > 0) { + firstMissingDependency = nonResolvedList.toString().split(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)[0]; + } + } + return firstMissingDependency; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java index f20dcae..358f6f9 100644 --- a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java @@ -30,9 +30,13 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.command.CommandException; import org.apache.oozie.command.coord.CoordCommandUtils; import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.util.WritableUtils; +import org.jdom.JDOMException; public class CoordPullInputDependency extends AbstractCoordInputDependency { private Map<String, CoordUnResolvedInputDependency> unResolvedList = new HashMap<String, CoordUnResolvedInputDependency>(); @@ -148,4 +152,19 @@ public class CoordPullInputDependency extends AbstractCoordInputDependency { return super.isDataSetResolved(dataSet); } } + + @Override + public Map<String, ActionDependency> getMissingDependencies(CoordinatorActionBean coordAction) + throws CommandException, IOException, JDOMException { + Map<String, ActionDependency> missingDependenciesMap = new HashMap<String, ActionDependency>(); + missingDependenciesMap.putAll(super.getMissingDependencies(coordAction)); + + for (String key : unResolvedList.keySet()) { + if (!unResolvedList.get(key).isResolved()) { + missingDependenciesMap.put(key, + new ActionDependency(unResolvedList.get(key).getDependencies(), new ArrayList<String>())); + } + } + return missingDependenciesMap; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java b/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java index fe7a327..cc99038 100644 --- a/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java +++ b/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java @@ -18,6 +18,7 @@ package org.apache.oozie.dependency; +import java.util.ArrayList; import java.util.List; public class ActionDependency { @@ -25,6 +26,10 @@ public class ActionDependency { private List<String> missingDependencies; private List<String> availableDependencies; + public ActionDependency() { + this(new ArrayList<String>(), new ArrayList<String>()); + } + public ActionDependency(List<String> missingDependencies, List<String> availableDependencies) { this.missingDependencies = missingDependencies; this.availableDependencies = availableDependencies; http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index c0e6c19..2c02998 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -29,8 +29,10 @@ import javax.persistence.Query; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; +import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.apache.oozie.util.DateUtils; /** * Query Executor that provides API to run query for Coordinator Action @@ -60,7 +62,8 @@ public class CoordActionQueryExecutor extends GET_ACTIVE_ACTIONS_FOR_DATES, GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, - GET_COORD_ACTION_FOR_SLA + GET_COORD_ACTION_FOR_SLA, + GET_COORD_ACTION_FOR_INPUTCHECK }; private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); @@ -179,6 +182,7 @@ public class CoordActionQueryExecutor extends case GET_COORD_ACTION: case GET_COORD_ACTION_STATUS: case GET_COORD_ACTION_FOR_SLA: + case GET_COORD_ACTION_FOR_INPUTCHECK: query.setParameter("id", parameters[0]); break; case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: @@ -341,6 +345,22 @@ public class CoordActionQueryExecutor extends bean.setExternalId((String) arr[3]); bean.setLastModifiedTime((Timestamp) arr[4]); break; + case GET_COORD_ACTION_FOR_INPUTCHECK: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setId((String) arr[0]); + bean.setActionNumber((Integer) arr[1]); + bean.setJobId((String) arr[2]); + bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[3])); + bean.setRunConfBlob((StringBlob) arr[4]); + bean.setNominalTime(DateUtils.toDate((Timestamp) arr[5])); + bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[6])); + bean.setActionXmlBlob((StringBlob) arr[7]); + bean.setMissingDependenciesBlob((StringBlob) arr[8]); + bean.setPushMissingDependenciesBlob((StringBlob) arr[9]); + bean.setTimeOut((Integer) arr[10]); + bean.setExternalId((String) arr[11]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java index 87a2b42..03acbc1 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java @@ -361,7 +361,14 @@ public abstract class BaseJobServlet extends JsonRestServlet { json.put(JsonTags.WORKFLOW_ACTION_RETRIES, retries); startCron(); sendJsonResponse(response, HttpServletResponse.SC_OK, json); - } else { + } + else if (show.equals(RestConstants.COORD_ACTION_MISSING_DEPENDENCIES)) { + stopCron(); + JSONObject json = getCoordActionMissingDependencies(request, response); + startCron(); + sendJsonResponse(response, HttpServletResponse.SC_OK, json); + } + else { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.JOB_SHOW_PARAM, show); } @@ -588,4 +595,16 @@ public abstract class BaseJobServlet extends JsonRestServlet { */ abstract JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException; + + /** + * Abstract method to get the coord action missing dependencies. + * + * @param request the request + * @param response the response + * @return the coord input dependencies + * @throws XServletException the x servlet exception + * @throws IOException Signals that an I/O exception has occurred. + */ + abstract JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java index 0c42128..d3b4689 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java @@ -38,6 +38,8 @@ public class V0JobServlet extends BaseJobServlet { private static final String INSTRUMENTATION_NAME = "v0job"; + final static String NOT_SUPPORTED_MESSAGE = "Not supported in v0"; + public V0JobServlet() { super(INSTRUMENTATION_NAME); } @@ -226,45 +228,53 @@ public class V0JobServlet extends BaseJobServlet { @Override protected JSONObject getJobsByParentId(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } + + @Override + protected JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); + + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java index 95dcca6..9356768 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java @@ -50,6 +50,9 @@ public class V1JobServlet extends BaseJobServlet { private static final String INSTRUMENTATION_NAME = "v1job"; public static final String COORD_ACTIONS_DEFAULT_LENGTH = "oozie.coord.actions.default.length"; + final static String NOT_SUPPORTED_MESSAGE = "Not supported in v1"; + + public V1JobServlet() { super(INSTRUMENTATION_NAME); } @@ -181,7 +184,7 @@ public class V1JobServlet extends BaseJobServlet { } @Override protected JSONObject ignoreJob(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } /* @@ -1041,7 +1044,7 @@ public class V1JobServlet extends BaseJobServlet { @Override protected String getJMSTopicName(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override @@ -1089,44 +1092,50 @@ public class V1JobServlet extends BaseJobServlet { @Override protected JSONObject updateJob(HttpServletRequest request, HttpServletResponse response, Configuration conf) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override protected void streamJobErrorLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override protected void streamJobAuditLog(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); + } + + @Override + JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } @Override JSONArray getActionRetries(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java index 3a0ffb0..6c30f5d 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java @@ -30,6 +30,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.BaseEngine; import org.apache.oozie.BaseEngineException; +import org.apache.oozie.BundleEngine; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorActionInfo; import org.apache.oozie.CoordinatorEngine; @@ -42,11 +43,14 @@ import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.JsonTags; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordCommandUtils; import org.apache.oozie.command.wf.ActionXCommand; +import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.service.BundleEngineService; import org.apache.oozie.service.CoordinatorEngineService; import org.apache.oozie.service.DagEngineService; import org.apache.oozie.service.Services; +import org.apache.oozie.util.Pair; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -315,6 +319,46 @@ public class V2JobServlet extends V1JobServlet { } } + @SuppressWarnings("unchecked") + @Override + protected JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response) + throws XServletException, IOException { + String jobId = getResourceName(request); + String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); + String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); + + try { + List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> dependenciesList = Services.get() + .get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)) + .getCoordActionMissingDependencies(jobId, actions, dates); + JSONArray dependenciesArray = new JSONArray(); + for (Pair<CoordinatorActionBean, Map<String, ActionDependency>> dependencies : dependenciesList) { + JSONObject json = new JSONObject(); + JSONArray parentJsonArray = new JSONArray(); + + for (String key : dependencies.getSecond().keySet()) { + JSONObject dependencyList = new JSONObject(); + JSONArray jsonArray = new JSONArray(); + jsonArray.addAll(dependencies.getSecond().get(key).getMissingDependencies()); + dependencyList.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, jsonArray); + dependencyList.put(JsonTags.COORDINATOR_ACTION_DATASET, key); + parentJsonArray.add(dependencyList); + } + json.put(JsonTags.COORD_ACTION_FIRST_MISSING_DEPENDENCIES, + CoordCommandUtils.getFirstMissingDependency(dependencies.getFirst())); + json.put(JsonTags.COORDINATOR_ACTION_ID, dependencies.getFirst().getActionNumber()); + json.put(JsonTags.COORDINATOR_ACTION_DATASETS, parentJsonArray); + dependenciesArray.add(json); + } + JSONObject jsonObject = new JSONObject(); + jsonObject.put(JsonTags.COORD_ACTION_MISSING_DEPENDENCIES, dependenciesArray); + return jsonObject; + } + catch (CommandException e) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); + } + } + /** * Gets the base engine based on jobId. * http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index 3522ffe..347f853 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -617,7 +617,7 @@ public class SLACalculatorMemory implements SLACalculator { @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) { - SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist()); + SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFirst()); if (slaCalc != null) { updateParams(slaCalc, jobIdSLAPair.getSecond()); updateDBSlaExpectedValues(slaCalc, updateList); http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/main/java/org/apache/oozie/util/Pair.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/Pair.java b/core/src/main/java/org/apache/oozie/util/Pair.java index 1bf45b4..4ece0d3 100644 --- a/core/src/main/java/org/apache/oozie/util/Pair.java +++ b/core/src/main/java/org/apache/oozie/util/Pair.java @@ -36,10 +36,10 @@ public class Pair<T, S> { public static <T, S> Pair<T, S> of(T first, S second) { - return new Pair(first, second); + return new Pair<T,S>(first, second); } - public T getFist() { + public T getFirst() { return first; } @@ -62,7 +62,7 @@ public class Pair<T, S> { return false; } - Pair otherPair = (Pair)other; + Pair<T, S> otherPair = (Pair<T,S>)other; return (ObjectUtils.equals(first, otherPair.first) && ObjectUtils.equals(second, otherPair.second)); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java index 4a12e42..c408d47 100644 --- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java +++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineSimple.java @@ -57,7 +57,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { assertEquals(1, map.size()); Pair<String, FILTER_COMPARATORS> key = map.keySet().iterator().next(); assertNotNull(key); - assertEquals(OozieClient.FILTER_STATUS, key.getFist()); + assertEquals(OozieClient.FILTER_STATUS, key.getFirst()); assertEquals(FILTER_COMPARATORS.EQUALS, key.getSecond()); List<Object> list = map.get(key); assertNotNull(list); @@ -71,7 +71,7 @@ public class TestCoordinatorEngineSimple extends XTestCase { assertEquals(1, map.size()); key = map.keySet().iterator().next(); assertNotNull(key); - assertEquals(OozieClient.FILTER_NOMINAL_TIME, key.getFist()); + assertEquals(OozieClient.FILTER_NOMINAL_TIME, key.getFirst()); assertEquals(FILTER_COMPARATORS.GREATER_EQUAL, key.getSecond()); list = map.get(key); assertNotNull(list); http://git-wip-us.apache.org/repos/asf/oozie/blob/9035d91c/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java index 8ec38e4..d873b96 100644 --- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java +++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java @@ -31,6 +31,7 @@ import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.oozie.BaseEngine; import org.apache.oozie.BuildInfo; import org.apache.oozie.cli.CLIParser; import org.apache.oozie.cli.OozieCLI; @@ -1626,6 +1627,21 @@ public class TestOozieCLI extends DagServletTestCase { } + public void testCoordActionMissingdependencies() throws Exception { + runTest(END_POINTS, SERVLET_CLASSES, false, new Callable<Void>() { + @Override + public Void call() throws Exception { + HeaderTestingVersionServlet.OOZIE_HEADERS.clear(); + String oozieUrl = getContextURL(); + String[] args = new String[] { "job", "-missingdeps", "aaa-C", "-oozie", oozieUrl }; + assertEquals(0, new OozieCLI().run(args)); + assertEquals(MockCoordinatorEngineService.did, RestConstants.COORD_ACTION_MISSING_DEPENDENCIES); + assertFalse(MockCoordinatorEngineService.started.get(1)); + return null; + } + }); + } + private String runOozieCLIAndGetStdout(String[] args) { PrintStream original = System.out; ByteArrayOutputStream baos = new ByteArrayOutputStream();
