OOZIE-1976 Specifying coordinator input datasets in more logical ways
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/81ce22b6 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/81ce22b6 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/81ce22b6 Branch: refs/heads/master Commit: 81ce22b6f23b2bba49df4733961ee82b58c38d0d Parents: 5abd3e6 Author: Purshotam Shah <[email protected]> Authored: Tue Jan 26 12:46:16 2016 -0800 Committer: Purshotam Shah <[email protected]> Committed: Tue Jan 26 12:46:16 2016 -0800 ---------------------------------------------------------------------- .../main/resources/oozie-coordinator-0.5.xsd | 194 ++++ core/pom.xml | 5 + .../org/apache/oozie/CoordinatorActionBean.java | 46 +- .../main/java/org/apache/oozie/ErrorCode.java | 2 + .../coord/CoordActionInputCheckXCommand.java | 318 ++---- .../CoordActionUpdatePushMissingDependency.java | 30 +- .../oozie/command/coord/CoordCommandUtils.java | 180 ++- .../CoordMaterializeTransitionXCommand.java | 6 +- .../coord/CoordPushDependencyCheckXCommand.java | 73 +- .../command/coord/CoordSubmitXCommand.java | 27 + .../apache/oozie/coord/CoordELConstants.java | 3 + .../apache/oozie/coord/CoordELEvaluator.java | 23 +- .../apache/oozie/coord/CoordELFunctions.java | 49 +- .../java/org/apache/oozie/coord/CoordUtils.java | 22 + .../org/apache/oozie/coord/SyncCoordAction.java | 22 + .../AbstractCoordInputDependency.java | 315 ++++++ .../input/dependency/CoordInputDependency.java | 172 +++ .../dependency/CoordInputDependencyFactory.java | 170 +++ .../input/dependency/CoordInputInstance.java | 83 ++ .../dependency/CoordOldInputDependency.java | 309 +++++ .../dependency/CoordPullInputDependency.java | 151 +++ .../dependency/CoordPushInputDependency.java | 49 + .../CoordUnResolvedInputDependency.java | 92 ++ .../input/logic/CoordInputLogicBuilder.java | 167 +++ .../input/logic/CoordInputLogicEvaluator.java | 44 + .../logic/CoordInputLogicEvaluatorPhaseOne.java | 324 ++++++ .../CoordInputLogicEvaluatorPhaseThree.java | 130 +++ .../logic/CoordInputLogicEvaluatorPhaseTwo.java | 144 +++ .../CoordInputLogicEvaluatorPhaseValidate.java | 89 ++ .../logic/CoordInputLogicEvaluatorResult.java | 104 ++ .../logic/CoordInputLogicEvaluatorUtil.java | 229 ++++ .../coord/input/logic/InputLogicParser.java | 309 +++++ .../coord/input/logic/OozieJexlEngine.java | 47 + .../coord/input/logic/OozieJexlInterpreter.java | 73 ++ .../oozie/dependency/ActionDependency.java | 2 +- .../oozie/dependency/DependencyChecker.java | 15 +- .../apache/oozie/dependency/FSURIHandler.java | 9 + .../apache/oozie/dependency/HCatURIHandler.java | 5 + .../org/apache/oozie/dependency/URIHandler.java | 14 + .../org/apache/oozie/util/WritableUtils.java | 148 ++- core/src/main/resources/oozie-default.xml | 2 +- .../TestCoordActionInputCheckXCommand.java | 9 +- .../input/logic/TestCoordInputLogicPush.java | 645 +++++++++++ .../input/logic/TestCoordinatorInputLogic.java | 1054 ++++++++++++++++++ .../coord/input/logic/TestInputLogicParser.java | 367 ++++++ core/src/test/resources/coord-action-sla.xml | 2 +- .../test/resources/coord-inputlogic-combine.xml | 119 ++ .../test/resources/coord-inputlogic-hcat.xml | 119 ++ .../test/resources/coord-inputlogic-latest.xml | 124 +++ .../resources/coord-inputlogic-range-latest.xml | 130 +++ .../test/resources/coord-inputlogic-range.xml | 107 ++ core/src/test/resources/coord-inputlogic.xml | 126 +++ pom.xml | 7 + release-log.txt | 1 + 54 files changed, 6625 insertions(+), 381 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/client/src/main/resources/oozie-coordinator-0.5.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/oozie-coordinator-0.5.xsd b/client/src/main/resources/oozie-coordinator-0.5.xsd new file mode 100644 index 0000000..2b63629 --- /dev/null +++ b/client/src/main/resources/oozie-coordinator-0.5.xsd @@ -0,0 +1,194 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.5" + elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.5"> + + <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/> + <xs:element name="datasets" type="coordinator:DATASETS"/> + <xs:simpleType name="IDENTIFIER"> + <xs:restriction base="xs:string"> + <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39}"/> + </xs:restriction> + </xs:simpleType> + <xs:complexType name="COORDINATOR-APP"> + <xs:sequence> + <xs:element name="parameters" type="coordinator:PARAMETERS" minOccurs="0" maxOccurs="1"/> + <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/> + <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/> + <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/> + <xs:element name="input-logic" type="coordinator:INPUTLOGIC" minOccurs="0" maxOccurs="1"/> + <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/> + <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="xs:string" use="required"/> + <xs:attribute name="frequency" type="xs:string" use="required"/> + <xs:attribute name="start" type="xs:string" use="required"/> + <xs:attribute name="end" type="xs:string" use="required"/> + <xs:attribute name="timezone" type="xs:string" use="required"/> + </xs:complexType> + <xs:complexType name="PARAMETERS"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> + <xs:complexType name="CONTROLS"> + <xs:sequence minOccurs="0" maxOccurs="1"> + <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/> + <xs:element name="throttle" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="DATASETS"> + <xs:sequence minOccurs="0" maxOccurs="1"> + <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/> + <xs:element name="async-dataset" type="coordinator:ASYNCDATASET" minOccurs="0" maxOccurs="1"/> + </xs:choice> + </xs:sequence> + </xs:complexType> + <xs:complexType name="SYNCDATASET"> + <xs:sequence> + <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> + <xs:attribute name="frequency" type="xs:string" use="required"/> + <xs:attribute name="initial-instance" type="xs:string" use="required"/> + <xs:attribute name="timezone" type="xs:string" use="required"/> + </xs:complexType> + <xs:complexType name="ASYNCDATASET"> + <xs:sequence> + <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> + <xs:attribute name="sequence-type" type="xs:string" use="required"/> + <xs:attribute name="initial-version" type="xs:string" use="required"/> + </xs:complexType> + <xs:complexType name="INPUTEVENTS"> + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="1"/> + <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="1"/> + <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/> + </xs:choice> + </xs:complexType> + <xs:complexType name="INPUTLOGIC"> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/> + </xs:choice> + </xs:complexType> + <xs:complexType name="LOGICALAND"> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/> + <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/> + </xs:choice> + <xs:attribute name="name" type="xs:string" use="optional"/> + <xs:attribute name="min" type="xs:string" use="optional"/> + <xs:attribute name="wait" type="xs:string" use="optional"/> + </xs:complexType> + <xs:complexType name="LOGICALOR"> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/> + <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/> + <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/> + </xs:choice> + <xs:attribute name="name" type="xs:string" use="optional"/> + <xs:attribute name="min" type="xs:string" use="optional"/> + <xs:attribute name="wait" type="xs:string" use="optional"/> + </xs:complexType> + <xs:complexType name="COMBINE"> + <xs:choice minOccurs="0" maxOccurs="unbounded"> + <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="2" maxOccurs="unbounded"/> + </xs:choice> + <xs:attribute name="name" type="xs:string" use="optional"/> + <xs:attribute name="min" type="xs:string" use="optional"/> + <xs:attribute name="wait" type="xs:string" use="optional"/> + </xs:complexType> + <xs:complexType name="LOGICALDATAIN"> + <xs:attribute name="name" type="xs:string" use="optional"/> + <xs:attribute name="min" type="xs:string" use="optional"/> + <xs:attribute name="wait" type="xs:string" use="optional"/> + <xs:attribute name="dataset" type="xs:string" use="required"/> + </xs:complexType> + <xs:complexType name="DATAIN"> + <xs:choice minOccurs="1" maxOccurs="1"> + <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/> + <xs:sequence minOccurs="1" maxOccurs="1"> + <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + </xs:choice> + <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> + <xs:attribute name="dataset" type="xs:string" use="required"/> + </xs:complexType> + <xs:complexType name="OUTPUTEVENTS"> + <xs:sequence minOccurs="1" maxOccurs="1"> + <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="DATAOUT"> + <xs:sequence minOccurs="1" maxOccurs="1"> + <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/> + </xs:sequence> + <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/> + <xs:attribute name="dataset" type="xs:string" use="required"/> + </xs:complexType> + <xs:complexType name="ACTION"> + <xs:sequence minOccurs="1" maxOccurs="1"> + <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/> + <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + <xs:complexType name="WORKFLOW"> + <xs:sequence> + <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/> + <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/> + </xs:sequence> + </xs:complexType> + + <xs:complexType name="FLAG"/> + <xs:complexType name="CONFIGURATION"> + <xs:sequence> + <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> + <xs:complexType> + <xs:sequence> + <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> + <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> + </xs:sequence> + </xs:complexType> + </xs:element> + </xs:sequence> + </xs:complexType> +</xs:schema> http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index b063dab..b72ea7d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -282,6 +282,11 @@ <version>3.4</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-jexl</artifactId> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.apache.oozie</groupId> http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index 91bff4d..b1be7c9 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -34,12 +34,15 @@ import javax.persistence.Lob; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; +import javax.persistence.Transient; import org.apache.hadoop.io.Writable; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.JsonTags; import org.apache.oozie.client.rest.JsonUtils; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; +import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.WritableUtils; import org.apache.openjpa.persistence.jdbc.Index; @@ -285,6 +288,13 @@ public class CoordinatorActionBean implements return toJSONObject("GMT"); } + @Transient + private CoordInputDependency coordPushInputDependency; + + @Transient + private CoordInputDependency coordPullInputDependency; + + public CoordinatorActionBean() { } @@ -745,23 +755,21 @@ public class CoordinatorActionBean implements json.put(JsonTags.COORDINATOR_ACTION_TYPE, type); json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber); json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf()); - json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils - .formatDateRfc822(getCreatedTime(), timeZoneId)); - json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils - .formatDateRfc822(getNominalTime(), timeZoneId)); + json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId)); + json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils.formatDateRfc822(getNominalTime(), timeZoneId)); json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId); // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils // .formatDateRfc822(startTime), timeZoneId); json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr); json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf()); - json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils - .formatDateRfc822(getLastModifiedTime(), timeZoneId)); + json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, + JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId)); // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils // .formatDateRfc822(startTime), timeZoneId); // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils // .formatDateRfc822(endTime), timeZoneId); - json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies()); - json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies()); + json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getPullInputDependencies().getMissingDependencies()); + json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushInputDependencies().getMissingDependencies()); json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus); json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri); json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl); @@ -818,5 +826,27 @@ public class CoordinatorActionBean implements return true; } + public CoordInputDependency getPullInputDependencies() { + if (coordPullInputDependency == null) { + coordPullInputDependency = CoordInputDependencyFactory.getPullInputDependencies(missingDependencies); + } + return coordPullInputDependency; + + } + + public CoordInputDependency getPushInputDependencies() { + if (coordPushInputDependency == null) { + coordPushInputDependency = CoordInputDependencyFactory.getPushInputDependencies(pushMissingDependencies); + } + return coordPushInputDependency; + } + + public void setPullInputDependencies(CoordInputDependency coordPullInputDependency) { + this.coordPullInputDependency = coordPullInputDependency; + } + + public void setPushInputDependencies(CoordInputDependency coordPushInputDependency) { + this.coordPushInputDependency = coordPushInputDependency; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 6c1e399..2907ca2 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -214,6 +214,8 @@ public enum ErrorCode { E1025(XLog.STD, "Coord status transit error: [{0}]"), E1026(XLog.STD, "SLA alert update command failed: {0}"), E1027(XLog.STD, "SLA change command failed. {0}"), + E1028(XLog.STD, "Coord input logic error. {0}"), + E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"), http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java index 11184d1..640d3cb 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java @@ -20,13 +20,9 @@ package org.apache.oozie.command.coord; import java.io.IOException; import java.io.StringReader; -import java.net.URI; -import java.net.URISyntaxException; -import java.text.ParseException; import java.util.Calendar; import java.util.Date; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; import org.apache.oozie.CoordinatorActionBean; @@ -34,14 +30,11 @@ import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; import org.apache.oozie.coord.CoordELEvaluator; import org.apache.oozie.coord.CoordELFunctions; -import org.apache.oozie.coord.TimeUnit; -import org.apache.oozie.dependency.URIHandler; -import org.apache.oozie.dependency.URIHandlerException; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; @@ -54,7 +47,6 @@ import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Service; import org.apache.oozie.service.Services; -import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.LogUtils; @@ -159,40 +151,38 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { StringBuilder existList = new StringBuilder(); StringBuilder nonExistList = new StringBuilder(); + CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); + CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); + + + String missingDependencies = coordPullInputDependency.getMissingDependencies(); StringBuilder nonResolvedList = new StringBuilder(); - String firstMissingDependency = ""; - String missingDeps = coordAction.getMissingDependencies(); - CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList); + CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList); + String firstMissingDependency = ""; // For clarity regarding which is the missing dependency in synchronous order // instead of printing entire list, some of which, may be available - if(nonExistList.length() > 0) { + if (nonExistList.length() > 0) { firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0]; } LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " " + nonResolvedList.toString()); - // Updating the list of data dependencies that are available and those that are yet not - boolean status = checkInput(actionXml, existList, nonExistList, actionConf); - String pushDeps = coordAction.getPushMissingDependencies(); - // Resolve latest/future only when all current missingDependencies and - // pushMissingDependencies are met + + + boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf); + String nonExistListStr = nonExistList.toString(); + boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet(); if (status && nonResolvedList.length() > 0) { - status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf) - : false; + status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false; } coordAction.setLastModifiedTime(currentTime); coordAction.setActionXml(actionXml.toString()); - if (nonResolvedList.length() > 0 && status == false) { - nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); - } - String nonExistListStr = nonExistList.toString(); - if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) { - // missingDeps null or empty means action should become READY - isChangeInDependency = true; - coordAction.setMissingDependencies(nonExistListStr); - } - if (status && (pushDeps == null || pushDeps.length() == 0)) { - String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId); + + isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status); + + if (status && isPushDependenciesMet) { + String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId, + coordPullInputDependency, coordPushInputDependency); actionXml.replace(0, actionXml.length(), newActionXml); coordAction.setActionXml(actionXml.toString()); coordAction.setStatus(CoordinatorAction.Status.READY); @@ -207,7 +197,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { updateCoordAction(coordAction, isChangeInDependency); } else { - if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) { + if (!nonExistListStr.isEmpty() && isPushDependenciesMet) { queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); } else { @@ -246,10 +236,25 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { return null; } + private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, + StringBuilder nonResolvedList, boolean status) throws IOException { + if (nonResolvedList.length() > 0 && status == false) { + nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList); + } + return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies, + nonResolvedList, status); + } - static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception { + static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) + throws Exception { + return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null); + } + + static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId, + CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception { Element eAction = XmlUtils.parseXml(actionXml.toString()); - ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId); + ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies, + pushDependencies); materializeDataProperties(eAction, actionConf, eval); return XmlUtils.prettyPrint(eAction).toString(); } @@ -268,6 +273,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { if (jpaService != null) { try { if (isChangeInDependency) { + coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize()); CoordActionQueryExecutor.getInstance().executeUpdate( CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction); if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { @@ -281,12 +287,11 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction); } } - catch (JPAExecutorException jex) { + catch (Exception jex) { throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); } } } - /** * This function reads the value of re-queue interval for coordinator input * check command from the Oozie configuration provided by Configuration @@ -310,16 +315,26 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { * @return true if all input paths are existed * @throws Exception thrown of unable to check input path */ - protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, + protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList, Configuration conf) throws Exception { - Element eAction = XmlUtils.parseXml(actionXml.toString()); - return checkResolvedUris(eAction, existList, nonExistList, conf); + return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList, + nonExistList); } - protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception { + /** + * Check un resolved input. + * + * @param coordAction the coord action + * @param actionXml the action xml + * @param conf the conf + * @return true, if successful + * @throws Exception the exception + */ + protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml, + Configuration conf) throws Exception { Element eAction = XmlUtils.parseXml(actionXml.toString()); LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future"); - boolean allExist = checkUnresolvedInstances(eAction, conf); + boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf); if (allExist) { actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString()); } @@ -327,6 +342,18 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { } /** + * Check un resolved input. + * + * @param actionXml the action xml + * @param conf the conf + * @return true, if successful + * @throws Exception the exception + */ + protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception { + return checkUnResolvedInput(coordAction, actionXml, conf); + } + + /** * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list * of files that will be needed. * @@ -378,222 +405,23 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> { * @throws Exception thrown if failed to resolve data input and output paths */ @SuppressWarnings("unchecked") - private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception { - String strAction = XmlUtils.prettyPrint(eAction).toString(); - Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); - String actualTimeStr = eAction.getAttributeValue("action-actual-time"); - Date actualTime = null; - if (actualTimeStr == null) { - LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " + - "from previous version. Assign current date to actual time, action = " + actionId); - actualTime = new Date(); - } else { - actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); - } + private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction, + Configuration actionConf) throws Exception { - StringBuffer resultedXml = new StringBuffer(); - - boolean ret; - Element inputList = eAction.getChild("input-events", eAction.getNamespace()); - if (inputList != null) { - ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime, - actualTime, actionConf); - if (ret == false) { - resultedXml.append(strAction); - return false; - } - } + boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction); // Using latest() or future() in output-event is not intuitive. // We need to make sure, this assumption is correct. Element outputList = eAction.getChild("output-events", eAction.getNamespace()); if (outputList != null) { for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { - if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) { + if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) { throw new CommandException(ErrorCode.E1006, "coord:latest()/future()", " not permitted in output-event "); } } } - return true; - } - - /** - * Resolve the list of data input paths - * - * @param eDataEvents the list of data input elements - * @param nominalTime action nominal time - * @param actualTime current time - * @param conf action configuration - * @return true if all unresolved URIs can be resolved - * @throws Exception thrown if failed to resolve data input paths - */ - @SuppressWarnings("unchecked") - private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime, - Configuration conf) throws Exception { - for (Element dEvent : eDataEvents) { - if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) { - continue; - } - ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf); - String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim(); - String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); - StringBuffer resolvedTmp = new StringBuffer(); - for (int i = 0; i < unresolvedList.length; i++) { - String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); - Boolean isResolved = (Boolean) eval.getVariable("is_resolved"); - if (isResolved == false) { - LOG.info("[" + actionId + "]::Cannot resolve: " + ret); - return false; - } - if (resolvedTmp.length() > 0) { - resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); - } - resolvedTmp.append((String) eval.getVariable("resolved_path")); - } - if (resolvedTmp.length() > 0) { - if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { - resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( - dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); - dEvent.removeChild("uris", dEvent.getNamespace()); - } - Element uriInstance = new Element("uris", dEvent.getNamespace()); - uriInstance.addContent(resolvedTmp.toString()); - dEvent.getContent().add(1, uriInstance); - } - dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()); - } - - return true; - } - - /** - * Check all resolved URIs existence - * - * @param eAction action element - * @param existList the list of existed paths - * @param nonExistList the list of paths to check existence - * @param conf action configuration - * @return true if all nonExistList paths exist - * @throws IOException thrown if unable to access the path - */ - private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList, - Configuration conf) throws IOException { - Element inputList = eAction.getChild("input-events", eAction.getNamespace()); - if (inputList != null) { - if (nonExistList.length() > 0) { - checkListOfPaths(existList, nonExistList, conf); - } - return nonExistList.length() == 0; - } - return true; - } - - /** - * Check a list of non existed paths and add to exist list if it exists - * - * @param existList the list of existed paths - * @param nonExistList the list of paths to check existence - * @param conf action configuration - * @return true if all nonExistList paths exist - * @throws IOException thrown if unable to access the path - */ - private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf) - throws IOException { - - String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); - if (uriList[0] != null) { - LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing."); - } - - nonExistList.delete(0, nonExistList.length()); - boolean allExists = true; - String existSeparator = "", nonExistSeparator = ""; - String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); - for (int i = 0; i < uriList.length; i++) { - if (allExists) { - allExists = pathExists(uriList[i], conf, user); - LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists); - } - if (allExists) { - existList.append(existSeparator).append(uriList[i]); - existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; - } - else { - nonExistList.append(nonExistSeparator).append(uriList[i]); - nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; - } - } - return allExists; - } - - /** - * Check if given path exists - * - * @param sPath uri path - * @param actionConf action configuration - * @return true if path exists - * @throws IOException thrown if unable to access the path - */ - protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException { - LOG.debug("checking for the file " + sPath); - try { - URI uri = new URI(sPath); - URIHandlerService service = Services.get().get(URIHandlerService.class); - URIHandler handler = service.getURIHandler(uri); - return handler.exists(uri, actionConf, user); - } - catch (URIHandlerException e) { - coordAction.setErrorCode(e.getErrorCode().toString()); - coordAction.setErrorMessage(e.getMessage()); - if (e.getCause() != null && e.getCause() instanceof AccessControlException) { - throw (AccessControlException) e.getCause(); - } - else { - throw new IOException(e); - } - } - catch (URISyntaxException e) { - coordAction.setErrorCode(ErrorCode.E0906.toString()); - coordAction.setErrorMessage(e.getMessage()); - throw new IOException(e); - } - } - - /** - * The function create a list of URIs separated by "," using the instances time stamp and URI-template - * - * @param event : <data-in> event - * @param instances : List of time stamp seprated by "," - * @param unresolvedInstances : list of instance with latest/future function - * @return : list of URIs separated by ",". - * @throws Exception thrown if failed to create URIs from unresolvedInstances - */ - @SuppressWarnings("unused") - private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception { - if (instances == null || instances.length() == 0) { - return ""; - } - String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR); - StringBuilder uris = new StringBuilder(); - - for (int i = 0; i < instanceList.length; i++) { - int funcType = CoordCommandUtils.getFuncType(instanceList[i]); - if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) { - if (unresolvedInstances.length() > 0) { - unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR); - } - unresolvedInstances.append(instanceList[i]); - continue; - } - ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]); - if (uris.length() > 0) { - uris.append(CoordELFunctions.INSTANCE_SEPARATOR); - } - uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild( - "uri-template", event.getNamespace()).getTextTrim())); - } - return uris.toString(); + return ret; } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java index 4e1c5b3..cb866e2 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java @@ -18,11 +18,9 @@ package org.apache.oozie.command.coord; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.List; import org.apache.oozie.command.CommandException; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; import org.apache.oozie.dependency.DependencyChecker; import org.apache.oozie.service.PartitionDependencyManagerService; import org.apache.oozie.service.Services; @@ -35,9 +33,11 @@ public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyC @Override protected Void execute() throws CommandException { + CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); + CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); + LOG.info("STARTED for Action id [{0}]", actionId); - String pushMissingDeps = coordAction.getPushMissingDependencies(); - if (pushMissingDeps == null || pushMissingDeps.length() == 0) { + if (coordPushInputDependency.isDependencyMet()) { LOG.info("Nothing to check. Empty push missing dependency"); } else { @@ -50,25 +50,19 @@ public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyC } } else { - LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(), - pushMissingDeps); - - String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps); - List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray)); - stillMissingDepsList.removeAll(availDepList); + String pushMissingDependencies = coordPushInputDependency.getMissingDependencies().toString(); + LOG.debug("Updating with available uris = [{0}] where missing uris = [{1}]", pushMissingDependencies); + String[] missingDependenciesArray = DependencyChecker.dependenciesAsArray(pushMissingDependencies); + coordPushInputDependency.addToAvailableDependencies(availDepList); boolean isChangeInDependency = true; - if (stillMissingDepsList.size() == 0) { + if (coordPushInputDependency.isDependencyMet()) { // All push-based dependencies are available - onAllPushDependenciesAvailable(); + onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet()); } else { - if (stillMissingDepsList.size() == missingDepsArray.length) { + if (coordPushInputDependency.getMissingDependenciesAsList().size() == missingDependenciesArray.length) { isChangeInDependency = false; } - else { - String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList); - coordAction.setPushMissingDependencies(stillMissingDeps); - } if (isTimeout()) { // Poll and check as one last try queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100); } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java index 58ef483..0af7edc 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java @@ -18,9 +18,12 @@ package org.apache.oozie.command.coord; +import java.io.IOException; import java.io.StringReader; import java.net.URI; +import java.net.URISyntaxException; import java.text.ParseException; +import java.util.ArrayList; import java.util.TimeZone; import java.util.Map; import java.util.HashMap; @@ -32,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.CoordELEvaluator; import org.apache.oozie.coord.CoordELFunctions; @@ -39,17 +43,25 @@ import org.apache.oozie.coord.CoordUtils; import org.apache.oozie.coord.CoordinatorJobException; import org.apache.oozie.coord.SyncCoordAction; import org.apache.oozie.coord.TimeUnit; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; +import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory; +import org.apache.oozie.coord.input.dependency.CoordInputInstance; import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.dependency.DependencyChecker; import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.dependency.URIHandler.DependencyType; +import org.apache.oozie.dependency.URIHandlerException; import org.apache.oozie.service.Services; import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.service.UUIDService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; +import org.jdom.Attribute; import org.jdom.Element; import org.jdom.JDOMException; import org.quartz.CronExpression; @@ -63,8 +75,9 @@ public class CoordCommandUtils { public static int OFFSET = 3; public static int ABSOLUTE = 4; public static int UNEXPECTED = -1; + public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!"; - public static final String UNRESOLVED_INST_TAG = "unresolved-instances"; + public static final String UNRESOLVED_INSTANCES_TAG = "unresolved-instances"; /** * parse a function like coord:latest(n)/future() and return the 'n'. @@ -357,7 +370,7 @@ public class CoordCommandUtils { depList.append(urisWithDoneFlag); } if (unresolvedInstances.length() > 0) { - Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace()); + Element elemInstance = new Element(UNRESOLVED_INSTANCES_TAG, event.getNamespace()); elemInstance.addContent(unresolvedInstances.toString()); event.getContent().add(1, elemInstance); } @@ -482,20 +495,24 @@ public class CoordCommandUtils { appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone"))); appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration"))); - Map<String, StringBuilder> dependencyMap = null; + boolean isInputLogicSpecified = CoordUtils.isInputLogicSpecified(eAction); Element inputList = eAction.getChild("input-events", eAction.getNamespace()); List<Element> dataInList = null; if (inputList != null) { dataInList = inputList.getChildren("data-in", eAction.getNamespace()); - dependencyMap = materializeDataEvents(dataInList, appInst, conf); + materializeInputDataEvents(dataInList, appInst, conf, actionBean, isInputLogicSpecified); } + if(isInputLogicSpecified){ + evaluateInputCheck(eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()), + CoordELEvaluator.createDataEvaluator(eAction, conf, actionId)); + } Element outputList = eAction.getChild("output-events", eAction.getNamespace()); List<Element> dataOutList = null; if (outputList != null) { dataOutList = outputList.getChildren("data-out", eAction.getNamespace()); - materializeDataEvents(dataOutList, appInst, conf); + materializeOutputDataEvents(dataOutList, appInst, conf); } eAction.removeAttribute("start"); @@ -513,16 +530,6 @@ public class CoordCommandUtils { actionBean.setLastModifiedTime(new Date()); actionBean.setStatus(CoordinatorAction.Status.WAITING); actionBean.setActionNumber(instanceCount); - if (dependencyMap != null) { - StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name()); - if (sbPull != null) { - actionBean.setMissingDependencies(sbPull.toString()); - } - StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name()); - if (sbPush != null) { - actionBean.setPushMissingDependencies(sbPush.toString()); - } - } actionBean.setNominalTime(nominalTime); boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf); if (isSla == true) { @@ -544,6 +551,7 @@ public class CoordCommandUtils { } } + /** * @param eAction the actionXml related element * @param actionBean the coordinator action bean @@ -554,12 +562,18 @@ public class CoordCommandUtils { String action = XmlUtils.prettyPrint(eAction).toString(); StringBuilder actionXml = new StringBuilder(action); Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf())); + actionBean.setActionXml(action); + + if (CoordUtils.isInputLogicSpecified(eAction)) { + new CoordInputLogicEvaluatorUtil(actionBean).validateInputLogic(); + } boolean isPushDepAvailable = true; - if (actionBean.getPushMissingDependencies() != null) { - ActionDependency actionDep = DependencyChecker.checkForAvailability( - actionBean.getPushMissingDependencies(), actionConf, true); - if (actionDep.getMissingDependencies().size() != 0) { + String pushMissingDependencies = actionBean.getPushInputDependencies().getMissingDependencies(); + if (pushMissingDependencies != null) { + ActionDependency actionDependencies = DependencyChecker.checkForAvailability(pushMissingDependencies, + actionConf, true); + if (actionDependencies.getMissingDependencies().size() != 0) { isPushDepAvailable = false; } @@ -571,13 +585,16 @@ public class CoordCommandUtils { StringBuilder existList = new StringBuilder(); StringBuilder nonExistList = new StringBuilder(); StringBuilder nonResolvedList = new StringBuilder(); - getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList); - isPullDepAvailable = coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf); + getResolvedList(actionBean.getPullInputDependencies().getMissingDependencies(), nonExistList, nonResolvedList); + isPullDepAvailable = actionBean.getPullInputDependencies().checkPullMissingDependencies(actionBean, + existList, nonExistList); + } if (isPullDepAvailable && isPushDepAvailable) { // Check for latest/future - boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionXml, actionConf); + boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionBean, actionXml, + actionConf); if (isLatestFutureDepAvailable) { String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, actionBean.getId()); @@ -598,17 +615,68 @@ public class CoordCommandUtils { * @param conf * @throws Exception */ - public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf - ) throws Exception { + private static void materializeOutputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf) + throws Exception { if (events == null) { - return null; + return; + } + + for (Element event : events) { + StringBuilder instances = new StringBuilder(); + ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf); + // Handle list of instance tag + resolveInstances(event, instances, appInst, conf, eval); + // Handle start-instance and end-instance + resolveInstanceRange(event, instances, appInst, conf, eval); + // Separate out the unresolved instances + separateResolvedAndUnresolved(event, instances); + + } + } + + private static void evaluateInputCheck(Element root, ELEvaluator evalInputLogic) throws Exception { + for (Object event : root.getChildren()) { + Element inputElement = (Element) event; + + resolveAttribute("dataset", inputElement, evalInputLogic); + resolveAttribute("name", inputElement, evalInputLogic); + resolveAttribute("min", inputElement, evalInputLogic); + resolveAttribute("wait", inputElement, evalInputLogic); + if (!inputElement.getChildren().isEmpty()) { + evaluateInputCheck(inputElement, evalInputLogic); + } } - StringBuilder unresolvedList = new StringBuilder(); - Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>(); + } + + private static String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { + Attribute attr = elem.getAttribute(attrName); + String val = null; + if (attr != null) { + try { + val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); + } + catch (Exception e) { + throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); + } + attr.setValue(val); + } + return val; + } + + public static void materializeInputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf, + CoordinatorActionBean actionBean, boolean isInputLogicSpecified) throws Exception { + + if (events == null) { + return; + } + CoordInputDependency coordPullInputDependency = CoordInputDependencyFactory + .createPullInputDependencies(isInputLogicSpecified); + CoordInputDependency coordPushInputDependency = CoordInputDependencyFactory + .createPushInputDependencies(isInputLogicSpecified); + Map<String, String> unresolvedList = new HashMap<String, String>(); + URIHandlerService uriService = Services.get().get(URIHandlerService.class); - StringBuilder pullMissingDep = null; - StringBuilder pushMissingDep = null; for (Element event : events) { StringBuilder instances = new StringBuilder(); @@ -619,41 +687,44 @@ public class CoordCommandUtils { resolveInstanceRange(event, instances, appInst, conf, eval); // Separate out the unresolved instances String resolvedList = separateResolvedAndUnresolved(event, instances); + String name = event.getAttribute("name").getValue(); + if (!resolvedList.isEmpty()) { Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template", event.getNamespace()); + String uriTemplate = uri.getText(); URI baseURI = uriService.getAuthorityWithScheme(uriTemplate); URIHandler handler = uriService.getURIHandler(baseURI); + List<CoordInputInstance> inputInstanceList = new ArrayList<CoordInputInstance>(); + + for (String inputInstance : resolvedList.split("#")) { + inputInstanceList.add(new CoordInputInstance(inputInstance, false)); + } + if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) { - pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append( - CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList); + coordPullInputDependency.addInputInstanceList(name, inputInstanceList); } else { - pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append( - CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList); + coordPushInputDependency.addInputInstanceList(name, inputInstanceList); + } } - String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace()); + String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INSTANCES_TAG, event.getNamespace()); if (tmpUnresolved != null) { - if (unresolvedList.length() > 0) { - unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR); - } - unresolvedList.append(tmpUnresolved); + unresolvedList.put(name, tmpUnresolved); } } - if (unresolvedList.length() > 0) { - if (pullMissingDep == null) { - pullMissingDep = new StringBuilder(); - } - pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList); + for(String unresolvedDatasetName:unresolvedList.keySet()){ + coordPullInputDependency.addUnResolvedList(unresolvedDatasetName, unresolvedList.get(unresolvedDatasetName)); } - dependencyMap.put(DependencyType.PULL.name(), pullMissingDep); - dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep); - return dependencyMap; - } + actionBean.setPullInputDependencies(coordPullInputDependency); + actionBean.setPushInputDependencies(coordPushInputDependency); + actionBean.setMissingDependencies(coordPullInputDependency.serialize()); + actionBean.setPushMissingDependencies(coordPushInputDependency.serialize()); + } /** * Get resolved string from missDepList * @@ -797,4 +868,19 @@ public class CoordCommandUtils { } return nextNominalTime; } + + public static boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException, + URISyntaxException, URIHandlerException { + URI uri = new URI(sPath); + URIHandlerService service = Services.get().get(URIHandlerService.class); + URIHandler handler = service.getURIHandler(uri); + return handler.exists(uri, actionConf, user); + } + + public static boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException, + URIHandlerException { + String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); + return pathExists(sPath, actionConf, user); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java index 39e6ac1..f6c1782 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java @@ -18,6 +18,7 @@ package org.apache.oozie.command.coord; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.AppType; import org.apache.oozie.CoordinatorActionBean; @@ -34,6 +35,7 @@ import org.apache.oozie.command.PreconditionException; import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; import org.apache.oozie.coord.CoordUtils; import org.apache.oozie.coord.TimeUnit; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; @@ -148,7 +150,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max(checkDelay, 0)); - if (coordAction.getPushMissingDependencies() != null) { + if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) { // TODO: Delay in catchup mode? queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100); } @@ -485,7 +487,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), nextTime, actualTime, lastActionNumber, jobConf, actionBean); actionBean.setTimeOut(timeout); - if (!dryrun) { storeToDB(actionBean, action, jobConf); // Storing to table @@ -529,7 +530,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " + actionXml.length()); actionBean.setActionXml(actionXml); - insertList.add(actionBean); writeActionSlaRegistration(actionXml, actionBean, jobConf); } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java index b05344d..2600a2b 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java @@ -21,10 +21,10 @@ package org.apache.oozie.command.coord; import java.io.IOException; import java.io.StringReader; import java.net.URI; -import java.util.Arrays; import java.util.Date; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; @@ -34,7 +34,7 @@ import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; -import org.apache.oozie.dependency.DependencyChecker; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; import org.apache.oozie.dependency.ActionDependency; import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; @@ -113,14 +113,15 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> return null; } - String pushMissingDeps = coordAction.getPushMissingDependencies(); - if (pushMissingDeps == null || pushMissingDeps.length() == 0) { + CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies(); + CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies(); + if (coordPushInputDependency.getMissingDependenciesAsList().size() == 0) { LOG.info("Nothing to check. Empty push missing dependency"); } else { - String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps); - LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]); - LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps); + List<String> missingDependenciesArray = coordPushInputDependency.getMissingDependenciesAsList(); + LOG.info("First Push missing dependency is [{0}] ", missingDependenciesArray.get(0)); + LOG.trace("Push missing dependencies are [{0}] ", missingDependenciesArray); if (registerForNotification) { LOG.debug("Register for notifications is true"); } @@ -134,27 +135,27 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> throw new CommandException(ErrorCode.E1307, e.getMessage(), e); } + + boolean isChangeInDependency = true; + boolean timeout = false; + ActionDependency actionDependency = coordPushInputDependency.checkPushMissingDependencies(coordAction, + registerForNotification); // Check all dependencies during materialization to avoid registering in the cache. // But check only first missing one afterwards similar to // CoordActionInputCheckXCommand for efficiency. listPartitions is costly. - ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf, - !registerForNotification); + if (actionDependency.getMissingDependencies().size() == missingDependenciesArray.size()) { + isChangeInDependency = false; + } + else { + coordPushInputDependency.setMissingDependencies(StringUtils.join( + actionDependency.getMissingDependencies(), CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR)); + } - boolean isChangeInDependency = true; - boolean timeout = false; - if (actionDep.getMissingDependencies().size() == 0) { + if (coordPushInputDependency.isDependencyMet()) { // All push-based dependencies are available - onAllPushDependenciesAvailable(); + onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet()); } else { - if (actionDep.getMissingDependencies().size() == missingDepsArray.length) { - isChangeInDependency = false; - } - else { - String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep - .getMissingDependencies()); - coordAction.setPushMissingDependencies(stillMissingDeps); - } // Checking for timeout timeout = isTimeout(); if (timeout) { @@ -166,15 +167,15 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> } } - updateCoordAction(coordAction, isChangeInDependency); + updateCoordAction(coordAction, isChangeInDependency || coordPushInputDependency.isDependencyMet()); if (registerForNotification) { - registerForNotification(actionDep.getMissingDependencies(), actionConf); + registerForNotification(coordPushInputDependency.getMissingDependenciesAsList(), actionConf); } if (removeAvailDependencies) { - unregisterAvailableDependencies(actionDep.getAvailableDependencies()); + unregisterAvailableDependencies(actionDependency.getAvailableDependencies()); } if (timeout) { - unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId); + unregisterMissingDependencies(coordPushInputDependency.getMissingDependenciesAsList(), actionId); } } catch (Exception e) { @@ -183,10 +184,9 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> LOG.debug("Queueing timeout command"); // XCommand.queue() will not work when there is a Exception callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName())); - unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId); + unregisterMissingDependencies(missingDependenciesArray, actionId); } - else if (coordAction.getMissingDependencies() != null - && coordAction.getMissingDependencies().length() > 0) { + else if (coordPullInputDependency.getMissingDependenciesAsList().size() > 0) { // Queue again on exception as RecoveryService will not queue this again with // the action being updated regularly by CoordActionInputCheckXCommand callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), @@ -221,18 +221,18 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> return (timeOut >= 0) && (waitingTime > timeOut); } - protected void onAllPushDependenciesAvailable() throws CommandException { - coordAction.setPushMissingDependencies(""); + protected void onAllPushDependenciesAvailable(boolean isPullDependencyMeet) throws CommandException { Services.get().get(PartitionDependencyManagerService.class) .removeCoordActionWithDependenciesAvailable(coordAction.getId()); - if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) { + if (isPullDependencyMeet) { Date nominalTime = coordAction.getNominalTime(); Date currentTime = new Date(); // The action should become READY only if current time > nominal time; // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time. if (nominalTime.compareTo(currentTime) > 0) { LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current=" - + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime)); + + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + + DateUtils.formatDateOozieTZ(nominalTime)); } else { String actionXml = resolveCoordConfiguration(); @@ -248,6 +248,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> // wait till RecoveryService kicks in queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId())); } + coordAction.getPushInputDependencies().setDependencyMet(true); + } private String resolveCoordConfiguration() throws CommandException { @@ -255,7 +257,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); StringBuilder actionXml = new StringBuilder(coordAction.getActionXml()); String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf, - actionId); + actionId, coordAction.getPullInputDependencies(), coordAction + .getPushInputDependencies()); actionXml.replace(0, actionXml.length(), newActionXml); return actionXml.toString(); } @@ -270,6 +273,7 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> if (jpaService != null) { try { if (isChangeInDependency) { + coordAction.setPushMissingDependencies(coordAction.getPushInputDependencies().serialize()); CoordActionQueryExecutor.getInstance().executeUpdate( CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction); if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) { @@ -286,6 +290,9 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> catch (JPAExecutorException jex) { throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex); } + catch (IOException ioe) { + throw new CommandException(ErrorCode.E1021, ioe.getMessage(), ioe); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java index d4d1c08..f1f9ab2 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java @@ -53,8 +53,10 @@ import org.apache.oozie.command.SubmitTransitionXCommand; import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; import org.apache.oozie.coord.CoordELEvaluator; import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.coord.CoordUtils; import org.apache.oozie.coord.CoordinatorJobException; import org.apache.oozie.coord.TimeUnit; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.CoordMaterializeTriggerService; @@ -799,6 +801,11 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { resolveIODataset(eAppXml); resolveIOEvents(eAppXml, dataNameList); + if (CoordUtils.isInputLogicSpecified(eAppXml)) { + resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst, + dataNameList); + } + resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", eAppXml.getNamespace()), evalNofuncs); // TODO: If action or workflow tag is missing, NullPointerException will @@ -896,6 +903,26 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand { } + private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList) + throws Exception { + for (Object event : root.getChildren()) { + Element inputElement = (Element) event; + resolveAttribute("dataset", inputElement, evalInputLogic); + String name=resolveAttribute("name", inputElement, evalInputLogic); + resolveAttribute("or", inputElement, evalInputLogic); + resolveAttribute("and", inputElement, evalInputLogic); + resolveAttribute("combine", inputElement, evalInputLogic); + + if (name != null) { + dataNameList.put(name, "data-in"); + } + + if (!inputElement.getChildren().isEmpty()) { + resolveInputLogic(inputElement, evalInputLogic, dataNameList); + } + } + } + /** * Resolve input-events/dataset and output-events/dataset tags. * http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java b/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java index f010a81..eabf473 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java @@ -33,4 +33,7 @@ public class CoordELConstants { public static final int SUBMIT_DAYS = 24 * 60; public static final String DEFAULT_DONE_FLAG = "_SUCCESS"; + final public static String RESOLVED_PATH = "resolved_path"; + + final public static String IS_RESOLVED = "is_resolved"; } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java b/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java index 8b2f456..fba8ac1 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java @@ -28,6 +28,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.command.coord.CoordCommandUtils; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; import org.apache.oozie.service.ELService; import org.apache.oozie.service.Services; import org.apache.oozie.util.DateUtils; @@ -141,7 +143,7 @@ public class CoordELEvaluator { uris = uris.replaceAll(CoordELFunctions.INSTANCE_SEPARATOR, CoordELFunctions.DIR_SEPARATOR); eval.setVariable(".dataout." + data.getAttributeValue("name"), uris); } - if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) { + if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) { eval.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); } } @@ -172,7 +174,13 @@ public class CoordELEvaluator { * @return configured ELEvaluator * @throws Exception : If there is any date-time string in wrong format, the exception is thrown */ + public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId) throws Exception { + return createDataEvaluator(eJob, conf, actionId, null, null); + } + + public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId, + CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception { ELEvaluator e = Services.get().get(ELService.class).createEvaluator("coord-action-start"); setConfigToEval(e, conf); SyncCoordAction appInst = new SyncCoordAction(); @@ -184,6 +192,12 @@ public class CoordELEvaluator { appInst.setTimeUnit(TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"))); appInst.setActionId(actionId); appInst.setName(eJob.getAttributeValue("name")); + appInst.setPullDependencies(pullDependencies); + appInst.setPushDependencies(pushDependencies); + if (CoordUtils.isInputLogicSpecified(eJob)) { + e.setVariable(".actionInputLogic", + XmlUtils.prettyPrint(eJob.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eJob.getNamespace())).toString()); + } } String strActualTime = eJob.getAttributeValue("action-actual-time"); if (strActualTime != null) { @@ -200,11 +214,14 @@ public class CoordELEvaluator { } else { } - if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) { + if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) { e.setVariable(".datain." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO: // check // null } + Element doneFlagElement = data.getChild("done-flag", data.getNamespace()); + String doneFlag = CoordUtils.getDoneFlag(doneFlagElement); + e.setVariable(".datain." + data.getAttributeValue("name") + ".doneFlag", doneFlag); } } events = eJob.getChild("output-events", eJob.getNamespace()); @@ -217,7 +234,7 @@ public class CoordELEvaluator { } else { }// TODO - if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) { + if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) { e.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO: // check // null
