http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java index 5d23866..ffa0943 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java @@ -19,11 +19,13 @@ package org.apache.oozie.coord; import com.google.common.collect.Lists; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.dependency.URIHandler.Context; import org.apache.oozie.service.Services; @@ -32,6 +34,7 @@ import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; +import org.jdom.JDOMException; import java.net.URI; import java.util.ArrayList; @@ -61,7 +64,6 @@ public class CoordELFunctions { public static final long DAY_MSEC = 24 * HOUR_MSEC; public static final long MONTH_MSEC = 30 * DAY_MSEC; public static final long YEAR_MSEC = 365 * DAY_MSEC; - /** * Used in defining the frequency in 'day' unit. <p> domain: <code> val > 0</code> and should be integer. * @@ -348,7 +350,7 @@ public class CoordELFunctions { resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); resolvedURIPaths.append(uriPath); retVal = resolvedInstances.toString(); - eval.setVariable("resolved_path", resolvedURIPaths.toString()); + eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); break; } else if (available >= startOffset) { @@ -356,6 +358,7 @@ public class CoordELFunctions { resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( INSTANCE_SEPARATOR); resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); + } available++; } @@ -366,6 +369,10 @@ public class CoordELFunctions { checkedInstance++; // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); } + if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { + eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); + } + } finally { if (uriContext != null) { @@ -375,7 +382,7 @@ public class CoordELFunctions { if (!resolved) { // return unchanged future function with variable 'is_resolved' // to 'false' - eval.setVariable("is_resolved", Boolean.FALSE); + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); if (startOffset == endOffset) { retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; } @@ -384,11 +391,11 @@ public class CoordELFunctions { } } else { - eval.setVariable("is_resolved", Boolean.TRUE); + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); } } else {// No feasible nominal time - eval.setVariable("is_resolved", Boolean.TRUE); + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); retVal = ""; } return retVal; @@ -495,8 +502,24 @@ public class CoordELFunctions { public static String ph3_coord_dataIn(String dataInName) { String uris = ""; ELEvaluator eval = ELEvaluator.getCurrent(); + if (eval.getVariable(".datain." + dataInName) == null + && !StringUtils.isEmpty(eval.getVariable(".actionInputLogic").toString())) { + try { + return new CoordInputLogicEvaluatorUtil().getInputDependencies(dataInName, + (SyncCoordAction) eval.getVariable(COORD_ACTION)); + } + catch (JDOMException e) { + XLog.getLog(CoordELFunctions.class).error(e); + throw new RuntimeException(e.getMessage()); + } + } + uris = (String) eval.getVariable(".datain." + dataInName); - Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved"); + Object unResolvedObj = eval.getVariable(".datain." + dataInName + ".unresolved"); + if (unResolvedObj == null) { + return uris; + } + Boolean unresolved = Boolean.parseBoolean(unResolvedObj.toString()); if (unresolved != null && unresolved.booleanValue() == true) { return "${coord:dataIn('" + dataInName + "')}"; } @@ -835,7 +858,7 @@ public class CoordELFunctions { public static String ph1_coord_dataIn_echo(String n) { ELEvaluator eval = ELEvaluator.getCurrent(); String val = (String) eval.getVariable("oozie.dataname." + n); - if (val == null || val.equals("data-in") == false) { + if ((val == null || val.equals("data-in") == false)) { XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " is not valid"); throw new RuntimeException("data_in_name " + n + " is not valid"); } @@ -1112,7 +1135,8 @@ public class CoordELFunctions { resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); resolvedURIPaths.append(uriPath); retVal = resolvedInstances.toString(); - eval.setVariable("resolved_path", resolvedURIPaths.toString()); + eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); + break; } else if (available <= endOffset) { @@ -1130,6 +1154,9 @@ public class CoordELFunctions { nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); } + if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { + eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); + } } finally { if (uriContext != null) { @@ -1139,7 +1166,7 @@ public class CoordELFunctions { if (!resolved) { // return unchanged latest function with variable 'is_resolved' // to 'false' - eval.setVariable("is_resolved", Boolean.FALSE); + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); if (startOffset == endOffset) { retVal = "${coord:latest(" + startOffset + ")}"; } @@ -1148,11 +1175,11 @@ public class CoordELFunctions { } } else { - eval.setVariable("is_resolved", Boolean.TRUE); + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); } } else {// No feasible nominal time - eval.setVariable("is_resolved", Boolean.FALSE); + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); } return retVal; }
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java index 94c6974..82f9bed 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java @@ -38,6 +38,8 @@ import org.apache.oozie.XException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; +import org.apache.oozie.coord.input.logic.InputLogicParser; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -51,7 +53,9 @@ import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XmlUtils; import org.jdom.Element; +import org.jdom.JDOMException; import com.google.common.annotations.VisibleForTesting; @@ -414,4 +418,22 @@ public class CoordUtils { } return params; } + + public static boolean isInputLogicSpecified(String actionXml) throws JDOMException { + return isInputLogicSpecified(XmlUtils.parseXml(actionXml)); + } + + public static boolean isInputLogicSpecified(Element eAction) throws JDOMException { + return eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()) != null; + } + + public static String getInputLogic(String actionXml) throws JDOMException { + return getInputLogic(XmlUtils.parseXml(actionXml)); + } + + public static String getInputLogic(Element actionXml) throws JDOMException { + return new InputLogicParser().parse(actionXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, + actionXml.getNamespace())); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java b/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java index 44258eb..5f6d7a8 100644 --- a/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java +++ b/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java @@ -20,6 +20,7 @@ package org.apache.oozie.coord; import java.util.Date; import java.util.TimeZone; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; /** * This class represents a Coordinator action. @@ -34,6 +35,10 @@ public class SyncCoordAction { private TimeUnit timeUnit; private TimeUnit endOfDuration; // End of Month or End of Days + private CoordInputDependency pullDependencies; + private CoordInputDependency pushDependencies; + + public String getActionId() { return this.actionId; } @@ -110,4 +115,21 @@ public class SyncCoordAction { this.endOfDuration = endOfDuration; } + public CoordInputDependency getPullDependencies() { + return pullDependencies; + } + + public void setPullDependencies(CoordInputDependency pullDependencies) { + this.pullDependencies = pullDependencies; + } + + public CoordInputDependency getPushDependencies() { + return pushDependencies; + } + + public void setPushDependencies(CoordInputDependency pushDependencies) { + this.pushDependencies = pushDependencies; + } + + } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java new file mode 100644 index 0000000..0da60ec --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Writable; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordCommandUtils; +import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; +import org.apache.oozie.dependency.ActionDependency; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.WritableUtils; +import org.jdom.Element; +import org.jdom.JDOMException; + +public abstract class AbstractCoordInputDependency implements Writable, CoordInputDependency { + protected boolean isDependencyMet = false; + /* + * Transient variables only used for processing, not stored in DB. + */ + protected transient Map<String, List<String>> missingDependenciesSet = new HashMap<String, List<String>>(); + protected transient Map<String, List<String>> availableDependenciesSet = new HashMap<String, List<String>>(); + protected Map<String, List<CoordInputInstance>> dependencyMap = new HashMap<String, List<CoordInputInstance>>(); + + public AbstractCoordInputDependency() { + } + + + public AbstractCoordInputDependency(Map<String, List<CoordInputInstance>> dependencyMap) { + this.dependencyMap = dependencyMap; + generateDependencies(); + } + + public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) { + dependencyMap.put(inputEventName, inputInstanceList); + } + + public Map<String, List<CoordInputInstance>> getDependencyMap() { + return dependencyMap; + } + + public void setDependencyMap(Map<String, List<CoordInputInstance>> dependencyMap) { + this.dependencyMap = dependencyMap; + } + + public void addToAvailableDependencies(String dataSet, CoordInputInstance coordInputInstance) { + coordInputInstance.setAvailability(true); + List<String> availableSet = availableDependenciesSet.get(dataSet); + if (availableSet == null) { + availableSet = new ArrayList<String>(); + availableDependenciesSet.put(dataSet, availableSet); + } + availableSet.add(coordInputInstance.getInputDataInstance()); + removeFromMissingDependencies(dataSet, coordInputInstance); + } + + public void removeFromMissingDependencies(String dataSet, CoordInputInstance coordInputInstance) { + coordInputInstance.setAvailability(true); + List<String> missingSet = missingDependenciesSet.get(dataSet); + if (missingSet != null) { + missingSet.remove(coordInputInstance.getInputDataInstance()); + if (missingSet.isEmpty()) { + missingDependenciesSet.remove(dataSet); + } + } + + } + + public void addToMissingDependencies(String dataSet, CoordInputInstance coordInputInstance) { + List<String> availableSet = missingDependenciesSet.get(dataSet); + if (availableSet == null) { + availableSet = new ArrayList<String>(); + } + availableSet.add(coordInputInstance.getInputDataInstance()); + missingDependenciesSet.put(dataSet, availableSet); + + } + + protected void generateDependencies() { + try { + missingDependenciesSet = new HashMap<String, List<String>>(); + availableDependenciesSet = new HashMap<String, List<String>>(); + + Set<String> keySets = dependencyMap.keySet(); + for (String key : keySets) { + for (CoordInputInstance coordInputInstance : dependencyMap.get(key)) + if (coordInputInstance.isAvailable()) { + addToAvailableDependencies(key, coordInputInstance); + } + else { + addToMissingDependencies(key, coordInputInstance); + } + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public List<String> getAvailableDependencies(String dataSet) { + if (availableDependenciesSet.get(dataSet) != null) { + return availableDependenciesSet.get(dataSet); + } + else { + return new ArrayList<String>(); + } + + } + + public String getMissingDependencies(String dataSet) { + StringBuilder sb = new StringBuilder(); + for (String dependencies : missingDependenciesSet.get(dataSet)) { + sb.append(dependencies).append("#"); + } + return sb.toString(); + } + + public void addToAvailableDependencies(String dataSet, String availableSet) { + List<CoordInputInstance> list = dependencyMap.get(dataSet); + if (list == null) { + list = new ArrayList<CoordInputInstance>(); + dependencyMap.put(dataSet, list); + } + + for (String available : availableSet.split(CoordELFunctions.INSTANCE_SEPARATOR)) { + CoordInputInstance coordInstance = new CoordInputInstance(available, true); + list.add(coordInstance); + addToAvailableDependencies(dataSet, coordInstance); + } + + } + + public String getMissingDependencies() { + StringBuilder sb = new StringBuilder(); + if (missingDependenciesSet != null) { + for (List<String> dependenciesList : missingDependenciesSet.values()) { + for (String dependencies : dependenciesList) { + sb.append(dependencies).append("#"); + } + } + } + return sb.toString(); + } + + public List<String> getMissingDependenciesAsList() { + List<String> missingDependencies = new ArrayList<String>(); + for (List<String> dependenciesList : missingDependenciesSet.values()) { + missingDependencies.addAll(dependenciesList); + } + return missingDependencies; + } + + public List<String> getAvailableDependenciesAsList() { + List<String> availableDependencies = new ArrayList<String>(); + for (List<String> dependenciesList : availableDependenciesSet.values()) { + availableDependencies.addAll(dependenciesList); + + } + return availableDependencies; + } + + public String serialize() throws IOException { + return CoordInputDependencyFactory.getMagicNumber() + + new String(WritableUtils.toByteArray(this), CoordInputDependencyFactory.CHAR_ENCODING); + + } + + public String getListAsString(List<String> dataSets) { + StringBuilder sb = new StringBuilder(); + for (String dependencies : dataSets) { + sb.append(dependencies).append("#"); + } + + return sb.toString(); + } + + public void setDependencyMet(boolean isDependencyMeet) { + this.isDependencyMet = isDependencyMeet; + } + + public boolean isDependencyMet() { + return missingDependenciesSet.isEmpty() || isDependencyMet; + } + + public boolean isUnResolvedDependencyMet() { + return false; + } + + + @Override + public void addToAvailableDependencies(Collection<String> availableList) { + for (Entry<String, List<CoordInputInstance>> dependenciesList : dependencyMap.entrySet()) { + for (CoordInputInstance coordInputInstance : dependenciesList.getValue()) { + if (availableList.contains(coordInputInstance.getInputDataInstance())) + addToAvailableDependencies(dependenciesList.getKey(), coordInputInstance); + } + } + } + + @Override + public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, + boolean registerForNotification) throws CommandException, IOException, + JDOMException { + boolean status = new CoordInputLogicEvaluatorUtil(coordAction).checkPushDependencies(); + if (status) { + coordAction.getPushInputDependencies().setDependencyMet(true); + } + return new ActionDependency(coordAction.getPushInputDependencies().getMissingDependenciesAsList(), coordAction + .getPushInputDependencies().getAvailableDependenciesAsList()); + + } + + public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, + StringBuilder existList, StringBuilder nonExistList) throws IOException, JDOMException { + boolean status = new CoordInputLogicEvaluatorUtil(coordAction).checkPullMissingDependencies(); + if (status) { + coordAction.getPullInputDependencies().setDependencyMet(true); + } + return status; + + } + + public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, + StringBuilder nonResolvedList, boolean status) { + if (!StringUtils.isEmpty(missingDependencies)) { + return !missingDependencies.equals(getMissingDependencies()); + } + else { + return true; + } + } + + @SuppressWarnings("unchecked") + public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) + throws Exception { + String actualTimeStr = eAction.getAttributeValue("action-actual-time"); + Element inputList = eAction.getChild("input-events", eAction.getNamespace()); + Date actualTime = null; + if (actualTimeStr == null) { + actualTime = new Date(); + } + else { + actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); + } + if (inputList == null) { + return true; + } + List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); + for (Element dEvent : eDataEvents) { + if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) { + continue; + } + String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, + dEvent.getNamespace()).getTextTrim(); + String name = dEvent.getAttribute("name").getValue(); + addUnResolvedList(name, unResolvedInstance); + } + return new CoordInputLogicEvaluatorUtil(coordAction).checkUnResolved(actualTime); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeStringAsBytes(out,INTERNAL_VERSION_ID); + out.writeBoolean(isDependencyMet); + WritableUtils.writeMapWithList(out, dependencyMap); + + } + + @Override + public void readFields(DataInput in) throws IOException { + WritableUtils.readBytesAsString(in); + this.isDependencyMet = in.readBoolean(); + dependencyMap = WritableUtils.readMapWithList(in, CoordInputInstance.class); + generateDependencies(); + } + + public boolean isDataSetResolved(String dataSet){ + if(getAvailableDependencies(dataSet) ==null|| getDependencyMap().get(dataSet) == null){ + return false; + } + return getAvailableDependencies(dataSet).size() == getDependencyMap().get(dataSet).size(); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java new file mode 100644 index 0000000..cf0edd0 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.dependency.ActionDependency; +import org.jdom.Element; +import org.jdom.JDOMException; + +public interface CoordInputDependency { + + public static final String INTERNAL_VERSION_ID = "V=1"; + + /** + * Adds the input instance list. + * + * @param inputEventName the input event name + * @param inputInstanceList the input instance list + */ + public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList); + + /** + * Gets the missing dependencies. + * + * @return the missing dependencies + */ + public String getMissingDependencies(); + + /** + * Checks if dependencies are meet. + * + * @return true, if dependencies are meet + */ + public boolean isDependencyMet(); + + /** + * Checks if is unresolved dependencies met. + * + * @return true, if unresolved dependencies are met + */ + public boolean isUnResolvedDependencyMet(); + + /** + * Sets the dependency meet. + * + * @param isMissingDependenciesMet the new dependency met + */ + public void setDependencyMet(boolean isMissingDependenciesMet); + + /** + * Serialize. + * + * @return the string + * @throws IOException Signals that an I/O exception has occurred. + */ + public String serialize() throws IOException; + + /** + * Gets the missing dependencies as list. + * + * @return the missing dependencies as list + */ + public List<String> getMissingDependenciesAsList(); + + /** + * Gets the available dependencies as list. + * + * @return the available dependencies as list + */ + public List<String> getAvailableDependenciesAsList(); + + /** + * Sets the missing dependencies. + * + * @param missingDependencies the new missing dependencies + */ + public void setMissingDependencies(String missingDependencies); + + /** + * Adds the un resolved list. + * + * @param name the name + * @param tmpUnresolved the tmp unresolved + */ + public void addUnResolvedList(String name, String tmpUnresolved); + + /** + * Gets the available dependencies. + * + * @param dataSet the data set + * @return the available dependencies + */ + public List<String> getAvailableDependencies(String dataSet); + + /** + * Adds the to available dependencies. + * + * @param availDepList the avail dep list + */ + public void addToAvailableDependencies(Collection<String> availDepList); + + /** + * Check push missing dependencies. + * + * @param coordAction the coord action + * @param registerForNotification the register for notification + * @return the action dependency + * @throws CommandException the command exception + * @throws IOException Signals that an I/O exception has occurred. + * @throws JDOMException the JDOM exception + */ + public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, + boolean registerForNotification) throws CommandException, IOException, JDOMException; + + /** + * Check pull missing dependencies. + * + * @param coordAction the coord action + * @param existList the exist list + * @param nonExistList the non exist list + * @return true, if successful + * @throws IOException Signals that an I/O exception has occurred. + * @throws JDOMException the JDOM exception + */ + public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList, + StringBuilder nonExistList) throws IOException, JDOMException; + + /** + * Checks if is change in dependency. + * + * @param nonExistList the non exist list + * @param missingDependencies the missing dependencies + * @param nonResolvedList the non resolved list + * @param status the status + * @return true, if is change in dependency + */ + public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, + StringBuilder nonResolvedList, boolean status); + + /** + * Check unresolved. + * + * @param coordAction the coord action + * @param eAction + * @return true, if successful + * @throws Exception the exception + */ + public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) + throws Exception; + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java new file mode 100644 index 0000000..ad50890 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.UnsupportedEncodingException; + +import org.apache.commons.lang.StringUtils; +import org.apache.oozie.StringBlob; +import org.apache.oozie.util.WritableUtils; +import org.apache.oozie.util.XLog; + +public class CoordInputDependencyFactory { + + // We need to choose magic number which is not allowed for file/dir. + // Magic number is ::$ + private static final byte[] MAGIC_NUMBER = new byte[] { 58, 58, 36 }; + public static final String CHAR_ENCODING = "ISO-8859-1"; + + public static XLog LOG = XLog.getLog(CoordInputDependencyFactory.class); + + /** + * Create the pull dependencies. + * + * @param isInputLogicSpecified to check if input logic is enable + * @return the pull dependencies + */ + public static CoordInputDependency createPullInputDependencies(boolean isInputLogicSpecified) { + if (!isInputLogicSpecified) { + return new CoordOldInputDependency(); + } + else { + return new CoordPullInputDependency(); + } + } + + /** + * Create the push dependencies. + * + * @param isInputLogicSpecified to check if input logic is enable + * @return the push dependencies + */ + public static CoordInputDependency createPushInputDependencies(boolean isInputLogicSpecified) { + if (!isInputLogicSpecified) { + return new CoordOldInputDependency(); + } + else { + return new CoordPushInputDependency(); + } + } + + /** + * Gets the pull input dependencies. + * + * @param missingDependencies the missing dependencies + * @return the pull input dependencies + */ + public static CoordInputDependency getPullInputDependencies(StringBlob missingDependencies) { + if (missingDependencies == null) { + return new CoordPullInputDependency(); + } + return getPullInputDependencies(missingDependencies.getString()); + } + + public static CoordInputDependency getPullInputDependencies(String dependencies) { + + if (StringUtils.isEmpty(dependencies)) { + return new CoordPullInputDependency(); + } + + if (!hasInputLogic(dependencies)) { + return new CoordOldInputDependency(dependencies); + } + else + try { + return WritableUtils.fromByteArray(getDependenciesWithoutMagicNumber(dependencies).getBytes(CHAR_ENCODING), + CoordPullInputDependency.class); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + /** + * Gets the push input dependencies. + * + * @param pushMissingDependencies the push missing dependencies + * @return the push input dependencies + */ + public static CoordInputDependency getPushInputDependencies(StringBlob pushMissingDependencies) { + + if (pushMissingDependencies == null) { + return new CoordPushInputDependency(); + } + return getPushInputDependencies(pushMissingDependencies.getString()); + + } + + public static CoordInputDependency getPushInputDependencies(String dependencies) { + + + if (StringUtils.isEmpty(dependencies)) { + return new CoordPushInputDependency(); + } + if (!hasInputLogic(dependencies)) { + return new CoordOldInputDependency(dependencies); + } + + else { + try { + return WritableUtils.fromByteArray(getDependenciesWithoutMagicNumber(dependencies).getBytes(CHAR_ENCODING), + CoordPushInputDependency.class); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + + } + } + + /** + * Checks if input logic is enable. + * + * @param dependencies the dependencies + * @return true, if is input logic enable + */ + private static boolean hasInputLogic(String dependencies) { + return dependencies.startsWith(getMagicNumber()); + } + + /** + * Gets the magic number. + * + * @return the magic number + */ + public static String getMagicNumber() { + try { + return new String(MAGIC_NUMBER, CHAR_ENCODING); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage()); + } + } + + /** + * Gets the dependencies without magic number. + * + * @param dependencies the dependencies + * @return the dependencies without magic number + */ + public static String getDependenciesWithoutMagicNumber(String dependencies) { + return dependencies.substring(getMagicNumber().length()); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java new file mode 100644 index 0000000..945fe44 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.oozie.util.WritableUtils; + +public class CoordInputInstance implements Writable { + + private String inputDataInstance = ""; + private boolean availability = false; + + public CoordInputInstance() { + + } + + public CoordInputInstance(String inputDataInstance, boolean availability) { + this.inputDataInstance = inputDataInstance; + this.availability = availability; + + } + + /** + * Gets the input data instance. + * + * @return the input data instance + */ + public String getInputDataInstance() { + return inputDataInstance; + } + + /** + * Checks if is available. + * + * @return true, if is available + */ + public boolean isAvailable() { + return availability; + } + + public void setAvailability(boolean availability) { + this.availability = availability; + } + + @Override + public String toString() { + return getInputDataInstance() + " : " + isAvailable(); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeStr(out, inputDataInstance); + out.writeBoolean(availability); + + } + + @Override + public void readFields(DataInput in) throws IOException { + inputDataInstance = WritableUtils.readStr(in); + availability = in.readBoolean(); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java new file mode 100644 index 0000000..9fc348f --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.IOException; +import java.io.StringReader; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordCommandUtils; +import org.apache.oozie.coord.CoordELConstants; +import org.apache.oozie.coord.CoordELEvaluator; +import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.dependency.ActionDependency; +import org.apache.oozie.dependency.DependencyChecker; +import org.apache.oozie.dependency.URIHandlerException; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.ParamChecker; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; +import org.jdom.JDOMException; + +/** + * Old approach where dependencies are stored as String. + * + */ +public class CoordOldInputDependency implements CoordInputDependency { + + private XLog log = XLog.getLog(getClass()); + + protected transient String missingDependencies = ""; + + public CoordOldInputDependency(String missingDependencies) { + this.missingDependencies = missingDependencies; + } + + public CoordOldInputDependency() { + } + + @Override + public void addInputInstanceList(String inputEventName, List<CoordInputInstance> inputInstanceList) { + appendToDependencies(inputInstanceList); + } + + @Override + public String getMissingDependencies() { + return missingDependencies; + } + + @Override + public boolean isDependencyMet() { + return StringUtils.isEmpty(missingDependencies); + } + + @Override + public boolean isUnResolvedDependencyMet() { + return false; + } + + @Override + public void setDependencyMet(boolean isDependencyMeet) { + if (isDependencyMeet) { + missingDependencies = ""; + } + + } + + @Override + public String serialize() throws IOException { + return missingDependencies; + } + + @Override + public List<String> getMissingDependenciesAsList() { + return Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies)); + } + + @Override + public List<String> getAvailableDependenciesAsList() { + return new ArrayList<String>(); + } + + @Override + public void setMissingDependencies(String missingDependencies) { + this.missingDependencies = missingDependencies; + + } + + public void appendToDependencies(List<CoordInputInstance> inputInstanceList) { + StringBuilder sb = new StringBuilder(missingDependencies); + boolean isFirst = true; + for (CoordInputInstance coordInputInstance : inputInstanceList) { + if (isFirst) { + if (!StringUtils.isEmpty(sb.toString())) { + sb.append(CoordELFunctions.INSTANCE_SEPARATOR); + } + } + else { + sb.append(CoordELFunctions.INSTANCE_SEPARATOR); + + } + sb.append(coordInputInstance.getInputDataInstance()); + isFirst = false; + } + missingDependencies = sb.toString(); + } + + @Override + public void addUnResolvedList(String name, String unresolvedDependencies) { + StringBuilder sb = new StringBuilder(missingDependencies); + sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies); + missingDependencies = sb.toString(); + } + + @Override + public List<String> getAvailableDependencies(String dataSet) { + return null; + } + + @Override + public void addToAvailableDependencies(Collection<String> availableList) { + + if (StringUtils.isEmpty(missingDependencies)) { + return; + } + List<String> missingDependenciesList = new ArrayList<String>(Arrays.asList((DependencyChecker + .dependenciesAsArray(missingDependencies)))); + missingDependenciesList.removeAll(availableList); + missingDependencies = DependencyChecker.dependenciesAsString(missingDependenciesList); + + } + + @Override + public boolean checkPullMissingDependencies(CoordinatorActionBean coordAction, StringBuilder existList, + StringBuilder nonExistList) throws IOException, JDOMException { + Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); + Element eAction = XmlUtils.parseXml(coordAction.getActionXml()); + + Element inputList = eAction.getChild("input-events", eAction.getNamespace()); + if (inputList != null) { + if (nonExistList.length() > 0) { + checkListOfPaths(coordAction, existList, nonExistList, actionConf); + } + return nonExistList.length() == 0; + } + return true; + } + + public ActionDependency checkPushMissingDependencies(CoordinatorActionBean coordAction, + boolean registerForNotification) throws CommandException, IOException { + return DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), new XConfiguration( + new StringReader(coordAction.getRunConf())), !registerForNotification); + } + + private boolean checkListOfPaths(CoordinatorActionBean coordAction, StringBuilder existList, + StringBuilder nonExistList, Configuration conf) throws IOException { + + String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR); + if (uriList[0] != null) { + log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + + " is Missing."); + } + + nonExistList.delete(0, nonExistList.length()); + boolean allExists = true; + String existSeparator = "", nonExistSeparator = ""; + String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); + for (int i = 0; i < uriList.length; i++) { + if (allExists) { + allExists = pathExists(coordAction, uriList[i], conf, user); + log.info("[" + coordAction.getId() + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + + allExists); + } + if (allExists) { + existList.append(existSeparator).append(uriList[i]); + existSeparator = CoordELFunctions.INSTANCE_SEPARATOR; + } + else { + nonExistList.append(nonExistSeparator).append(uriList[i]); + nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR; + } + } + return allExists; + } + + public boolean pathExists(CoordinatorActionBean coordAction, String sPath, Configuration actionConf, String user) + throws IOException { + log.debug("checking for the file " + sPath); + try { + return CoordCommandUtils.pathExists(sPath, actionConf, user); + } + catch (URIHandlerException e) { + if (coordAction != null) { + coordAction.setErrorCode(e.getErrorCode().toString()); + coordAction.setErrorMessage(e.getMessage()); + } + if (e.getCause() != null && e.getCause() instanceof AccessControlException) { + throw (AccessControlException) e.getCause(); + } + else { + log.error(e); + throw new IOException(e); + } + } + catch (URISyntaxException e) { + if (coordAction != null) { + coordAction.setErrorCode(ErrorCode.E0906.toString()); + coordAction.setErrorMessage(e.getMessage()); + } + log.error(e); + throw new IOException(e); + } + } + + public boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies, + StringBuilder nonResolvedList, boolean status) { + if ((!nonExistList.toString().equals(missingDependencies) || missingDependencies.isEmpty())) { + setMissingDependencies(nonExistList.toString()); + return true; + } + return false; + } + + @SuppressWarnings("unchecked") + public boolean checkUnresolved(CoordinatorActionBean coordAction, Element eAction) + throws Exception { + Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time")); + String actualTimeStr = eAction.getAttributeValue("action-actual-time"); + Element inputList = eAction.getChild("input-events", eAction.getNamespace()); + + List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); + Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); + + Date actualTime = null; + if (actualTimeStr == null) { + actualTime = new Date(); + } + else { + actualTime = DateUtils.parseDateOozieTZ(actualTimeStr); + } + + for (Element dEvent : eDataEvents) { + if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) == null) { + continue; + } + ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, actionConf); + String unResolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, + dEvent.getNamespace()).getTextTrim(); + String unresolvedList[] = unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR); + StringBuffer resolvedTmp = new StringBuffer(); + for (int i = 0; i < unresolvedList.length; i++) { + String returnData = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]); + Boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED); + if (isResolved == false) { + log.info("[" + coordAction.getId() + "] :: Cannot resolve : " + returnData); + return false; + } + if (resolvedTmp.length() > 0) { + resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR); + } + resolvedTmp.append((String) eval.getVariable(CoordELConstants.RESOLVED_PATH)); + } + if (resolvedTmp.length() > 0) { + if (dEvent.getChild("uris", dEvent.getNamespace()) != null) { + resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append( + dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim()); + dEvent.removeChild("uris", dEvent.getNamespace()); + } + Element uriInstance = new Element("uris", dEvent.getNamespace()); + uriInstance.addContent(resolvedTmp.toString()); + dEvent.getContent().add(1, uriInstance); + } + dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()); + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java new file mode 100644 index 0000000..f20dcae --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.oozie.command.coord.CoordCommandUtils; +import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.util.WritableUtils; + +public class CoordPullInputDependency extends AbstractCoordInputDependency { + private Map<String, CoordUnResolvedInputDependency> unResolvedList = new HashMap<String, CoordUnResolvedInputDependency>(); + + public CoordPullInputDependency() { + super(); + + } + + public void addResolvedList(String dataSet, String list) { + unResolvedList.get(dataSet).addResolvedList(Arrays.asList(list.split(","))); + } + + public CoordUnResolvedInputDependency getUnResolvedDependency(String dataSet) { + return unResolvedList.get(dataSet); + } + + public boolean isUnResolvedDependencyMet() { + for (CoordUnResolvedInputDependency coordUnResolvedDependency : unResolvedList.values()) { + if (!coordUnResolvedDependency.isResolved()) { + return false; + } + } + return true; + } + + public void addUnResolvedList(String dataSet, String dependency) { + unResolvedList.put(dataSet, new CoordUnResolvedInputDependency(Arrays.asList(dependency.split("#")))); + } + + public String getMissingDependencies() { + StringBuffer bf = new StringBuffer(super.getMissingDependencies()); + String unresolvedMissingDependencies = getUnresolvedMissingDependencies(); + if (!StringUtils.isEmpty(unresolvedMissingDependencies)) { + bf.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR); + bf.append(unresolvedMissingDependencies); + } + return bf.toString(); + } + + public String getUnresolvedMissingDependencies() { + StringBuffer bf = new StringBuffer(); + if (unResolvedList != null) { + for (CoordUnResolvedInputDependency coordUnResolvedDependency : unResolvedList.values()) { + if (!coordUnResolvedDependency.isResolved()) { + String unresolvedList = coordUnResolvedDependency.getUnResolvedList(); + if (bf.length() > 0 && !unresolvedList.isEmpty()) { + bf.append(CoordELFunctions.INSTANCE_SEPARATOR); + } + bf.append(unresolvedList); + } + } + } + return bf.toString(); + } + + protected void generateDependencies() { + super.generateDependencies(); + } + + private void writeObject(ObjectOutputStream os) throws IOException, ClassNotFoundException { + os.writeObject(unResolvedList); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + unResolvedList = (Map<String, CoordUnResolvedInputDependency>) in.readObject(); + generateDependencies(); + } + + public boolean isDependencyMet() { + return isResolvedDependencyMeet() && isUnResolvedDependencyMet(); + + } + + public boolean isResolvedDependencyMeet() { + return super.isDependencyMet(); + + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + WritableUtils.writeMap(out, unResolvedList); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + unResolvedList = WritableUtils.readMap(in, CoordUnResolvedInputDependency.class); + } + + @Override + public void setMissingDependencies(String join) { + // We don't have to set this for input logic. Dependency map will have computed missing dependencies + } + + @Override + public List<String> getAvailableDependencies(String dataSet) { + List<String> availableList = new ArrayList<String>(); + availableList.addAll(super.getAvailableDependencies(dataSet)); + if (getUnResolvedDependency(dataSet) != null) { + availableList.addAll(getUnResolvedDependency(dataSet).getResolvedList()); + } + return availableList; + } + + public boolean isDataSetResolved(String dataSet) { + if(unResolvedList.containsKey(dataSet)){ + return unResolvedList.get(dataSet).isResolved(); + } + else{ + return super.isDataSetResolved(dataSet); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java new file mode 100644 index 0000000..e19e799 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class CoordPushInputDependency extends AbstractCoordInputDependency { + + public CoordPushInputDependency() { + super(); + } + + @Override + public void setMissingDependencies(String join) { + } + + @Override + public void addUnResolvedList(String name, String tmpUnresolved) { + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java new file mode 100644 index 0000000..096b588 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.dependency; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Writable; +import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.util.WritableUtils; + +public class CoordUnResolvedInputDependency implements Writable { + + private boolean isResolved; + private List<String> dependency = new ArrayList<String>(); + private List<String> resolvedList = new ArrayList<String>(); + + public CoordUnResolvedInputDependency(List<String> dependency) { + this.dependency = dependency; + + } + + public CoordUnResolvedInputDependency() { + } + + public boolean isResolved() { + return isResolved; + } + + public void setResolved(boolean isResolved) { + this.isResolved = isResolved; + } + + public List<String> getDependencies() { + return dependency; + } + + public List<String> getResolvedList() { + return resolvedList; + } + + public void setResolvedList(List<String> resolvedList) { + this.resolvedList = resolvedList; + } + + public void addResolvedList(List<String> resolvedList) { + this.resolvedList.addAll(resolvedList); + } + + public String getUnResolvedList() { + if (!isResolved) { + return StringUtils.join(dependency, CoordELFunctions.INSTANCE_SEPARATOR); + } + else + return ""; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(isResolved); + WritableUtils.writeStringList(out, dependency); + WritableUtils.writeStringList(out, resolvedList); + } + + @Override + public void readFields(DataInput in) throws IOException { + + isResolved = in.readBoolean(); + dependency = WritableUtils.readStringList(in); + resolvedList = WritableUtils.readStringList(in); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java new file mode 100644 index 0000000..2326cd7 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; + +public class CoordInputLogicBuilder { + + StringBuffer bf = new StringBuffer(); + + CoordInputLogicEvaluator coordInputlogicEvaluator; + + /** The Dependency builder. */ + public CoordInputDependencyBuilder dependencyBuilder; + + public CoordInputLogicBuilder(CoordInputLogicEvaluator coordInputlogicEvaluator) { + this.coordInputlogicEvaluator = coordInputlogicEvaluator; + dependencyBuilder = new CoordInputDependencyBuilder(coordInputlogicEvaluator); + } + + /** + * Input function of input-logic + * + * @param inputDataset the dataset + * @return the string + */ + public CoordInputLogicEvaluatorResult input(String inputDataset) { + return coordInputlogicEvaluator.evalInput(inputDataset, -1, -1); + + } + + /** + * Combine function of dataset + * + * @param combineDatasets the combine + * @return the string + */ + public CoordInputLogicEvaluatorResult combine(String... combineDatasets) { + return coordInputlogicEvaluator.evalCombineInput(combineDatasets, -1, -1); + } + + /** + * The Class CoordInputDependencyBuilder. + */ + public static class CoordInputDependencyBuilder { + + CoordInputLogicEvaluator coordInputLogicEvaluator; + + public CoordInputDependencyBuilder(CoordInputLogicEvaluator coordInputLogicEvaluator) { + this.coordInputLogicEvaluator = coordInputLogicEvaluator; + + } + + private int minValue = -1; + private String wait; + private String inputDataset; + private String[] combineDatasets; + + /** + * Construct min function + * + * @param minValue the min value + * @return the coord input dependency builder + */ + public CoordInputDependencyBuilder min(int minValue) { + this.minValue = minValue; + return this; + } + + /** + * Construct wait function + * + * @param wait the wait + * @return the coord input dependency builder + */ + public CoordInputDependencyBuilder inputWait(String wait) { + this.wait = wait; + return this; + } + + /** + * Construct wait function + * + * @param wait the wait + * @return the coord input dependency builder + */ + public CoordInputDependencyBuilder inputWait(int wait) { + this.wait = String.valueOf(wait); + return this; + } + + /** + * Construct input function + * + * @param dataset the input + * @return the coord input dependency builder + */ + public CoordInputDependencyBuilder input(String dataset) { + this.inputDataset = dataset; + return this; + } + + /** + * Construct complie function + * + * @param combineDatasets the combine + * @return the coord input dependency builder + */ + public CoordInputDependencyBuilder combine(String... combineDatasets) { + this.combineDatasets = combineDatasets; + return this; + } + + /** + * Build inputlogic expression + * + * @return the string + * @throws IOException Signals that an I/O exception has occurred. + */ + public CoordInputLogicEvaluatorResult build() throws IOException { + if (combineDatasets != null) { + return coordInputLogicEvaluator.evalCombineInput(combineDatasets, minValue, getTime(wait)); + } + else { + return coordInputLogicEvaluator.evalInput(inputDataset, minValue, getTime(wait)); + } + } + + /** + * Gets the time in min. + * + * @param value the value + * @return the time in min + * @throws IOException Signals that an I/O exception has occurred. + */ + private int getTime(String value) throws IOException { + if (StringUtils.isEmpty(value)) { + return -1; + } + if (StringUtils.isNumeric(value)) { + return Integer.parseInt(value); + } + else { + throw new IOException("Unsupported time : " + value); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java new file mode 100644 index 0000000..c495570 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +public interface CoordInputLogicEvaluator { + public static final String INPUT_LOGIC = "input-logic"; + + + /** + * Eval input. + * + * @param inputDataSet the input data set + * @param min the min + * @param wait the wait + * @return the coord input logic evaluator result + */ + public CoordInputLogicEvaluatorResult evalInput(String inputDataSet, int min, int wait); + + /** + * Eval combine input. + * + * @param combineDatasets the combine datasets + * @param min the min + * @param wait the wait + * @return the coord input logic evaluator result + */ + public CoordInputLogicEvaluatorResult evalCombineInput(String[] combineDatasets, int min, int wait); +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java new file mode 100644 index 0000000..f54d305 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.io.IOException; +import java.io.StringReader; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.command.coord.CoordCommandUtils; +import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency; +import org.apache.oozie.coord.input.dependency.CoordInputDependency; +import org.apache.oozie.coord.input.dependency.CoordInputInstance; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS; +import org.apache.oozie.dependency.URIHandlerException; +import org.apache.oozie.util.LogUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XLog; + +/** + * PhaseOne is for all dependencies check, except unresolved. Unresolved will be checked as part of phaseTwo. + * Phasethree is only to get dependencies from dataset, no hdfs/hcat check. + */ +public class CoordInputLogicEvaluatorPhaseOne implements CoordInputLogicEvaluator { + + protected AbstractCoordInputDependency coordInputDependency; + protected Map<String, List<CoordInputInstance>> dependencyMap; + protected CoordinatorActionBean coordAction = null; + protected XLog log = XLog.getLog(getClass()); + + public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction) { + this(coordAction, coordAction.getPullInputDependencies()); + } + + public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction, CoordInputDependency coordInputDependency) { + this.coordAction = coordAction; + this.coordInputDependency = (AbstractCoordInputDependency) coordInputDependency; + dependencyMap = ((AbstractCoordInputDependency) coordInputDependency).getDependencyMap(); + LogUtils.setLogInfo(coordAction.getId()); + + } + + public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { + return input(coordInputDependency, dataSet, min, wait); + + } + + /** + * Evaluate input function with min and wait + * + * @param coordInputDependency + * @param dataSet + * @param min + * @param wait + * @return the coord input logic evaluator result + */ + public CoordInputLogicEvaluatorResult input(AbstractCoordInputDependency coordInputDependency, String dataSet, + int min, int wait) { + + List<String> availableList = new ArrayList<String>(); + if (coordInputDependency.getDependencyMap().get(dataSet) == null) { + CoordInputLogicEvaluatorResult retData = new CoordInputLogicEvaluatorResult(); + if (coordInputDependency.getAvailableDependencies(dataSet) == null + || coordInputDependency.getAvailableDependencies(dataSet).isEmpty()) { + log.debug("Data set [{0}] is unresolved set, will get resolved in phasetwo", dataSet); + retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.PHASE_TWO_EVALUATION); + } + else { + return getResultFromPullPush(coordAction, dataSet, min); + } + return retData; + } + boolean allFound = true; + try { + Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf())); + List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(dataSet); + for (int i = 0; i < firstInputSetList.size(); i++) { + CoordInputInstance coordInputInstance = firstInputSetList.get(i); + if (!coordInputInstance.isAvailable()) { + if (pathExists(coordInputInstance.getInputDataInstance(), actionConf)) { + availableList.add(coordInputInstance.getInputDataInstance()); + coordInputDependency.addToAvailableDependencies(dataSet, coordInputInstance); + } + else { + log.debug("[{0} is not found ", coordInputInstance.getInputDataInstance()); + allFound = false; + // Stop looking for dependencies, if min is not specified. + if (min < 0) { + break; + } + } + } + else { + availableList.add(coordInputInstance.getInputDataInstance()); + } + } + } + catch (Exception e) { + log.error(e); + throw new RuntimeException(ErrorCode.E1028.format("Error executing input function " + e.getMessage())); + } + CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, wait, availableList); + + log.debug("Resolved status of Data set [{0}] with min [{1}] and wait [{2}] = [{3}]", dataSet, min, wait, + retData.getStatus()); + return retData; + } + + public boolean isInputWaitElapsed(int timeInMin) { + + if (timeInMin == -1) { + return true; + } + long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction + .getCreatedTime().getTime())) + / (60 * 1000); + return timeInMin <= waitingTime; + } + + public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { + return combine(coordInputDependency, inputSets, min, wait); + } + + public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency coordInputDependency, + String[] inputSets, int min, int wait) { + + List<String> availableList = new ArrayList<String>(); + + if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) { + return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING); + } + + try { + + Configuration jobConf = new XConfiguration(new StringReader(coordAction.getRunConf())); + String firstInputSet = inputSets[0]; + List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(firstInputSet); + for (int i = 0; i < firstInputSetList.size(); i++) { + CoordInputInstance coordInputInstance = firstInputSetList.get(i); + boolean found = false; + if (!coordInputInstance.isAvailable()) { + if (!pathExists(coordInputInstance.getInputDataInstance(), jobConf)) { + log.debug(MessageFormat.format("{0} is not found. Looking from other datasets.", + coordInputInstance.getInputDataInstance())); + for (int j = 1; j < inputSets.length; j++) { + if (!coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable()) { + if (pathExists(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) + .getInputDataInstance(), jobConf)) { + coordInputDependency.addToAvailableDependencies(inputSets[j], coordInputDependency + .getDependencyMap().get(inputSets[j]).get(i)); + availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) + .getInputDataInstance()); + log.debug(MessageFormat.format("{0} is found.", + coordInputInstance.getInputDataInstance())); + found = true; + } + + } + else { + coordInputDependency.addToAvailableDependencies(inputSets[j], coordInputDependency + .getDependencyMap().get(inputSets[j]).get(i)); + availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) + .getInputDataInstance()); + found = true; + + } + } + } + else { + coordInputDependency.addToAvailableDependencies(firstInputSet, coordInputInstance); + availableList.add(coordInputInstance.getInputDataInstance()); + found = true; + } + } + else { + availableList.add(coordInputInstance.getInputDataInstance()); + found = true; + } + + if (min < 0 && !found) { + // Stop looking for dependencies, if min is not specified. + break; + } + + } + } + catch (Exception e) { + log.error(e); + throw new RuntimeException(ErrorCode.E1028.format("Error executing combine function " + e.getMessage())); + } + boolean allFound = availableList.size() == coordInputDependency.getDependencyMap().get(inputSets[0]).size(); + CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, wait, availableList); + log.debug("Resolved status of Data set [{0}] with min [{1}] and wait [{2}] = [{3}]", + Arrays.toString(inputSets), min, wait, retData.getStatus()); + return retData; + + } + + public Configuration getConf() throws IOException { + return new XConfiguration(new StringReader(coordAction.getRunConf())); + + } + + public String getListAsString(List<String> list, String dataset) { + if (list == null || list.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (int i = 1; i < list.size(); i++) { + sb.append(list.get(i - 1)).append(","); + } + sb.append(list.get(list.size() - 1)); + return sb.toString(); + } + + protected CoordInputLogicEvaluatorResult getEvalResult(boolean found, int min, int wait, List<String> availableList) { + CoordInputLogicEvaluatorResult retData = new CoordInputLogicEvaluatorResult(); + if (!found && wait > 0) { + if (!isInputWaitElapsed(wait)) { + return new CoordInputLogicEvaluatorResult(STATUS.TIMED_WAITING); + } + } + + if (found || (min > 0 && availableList.size() >= min)) { + retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); + retData.setDataSets(getListAsString(availableList, null)); + } + + if (min == 0) { + retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); + } + + return retData; + } + + protected boolean pathExists(String sPath, Configuration jobConf) throws IOException, URISyntaxException, + URIHandlerException { + return CoordCommandUtils.pathExists(sPath, jobConf); + + } + + public CoordInputLogicEvaluatorResult getResultFromPullPush(CoordinatorActionBean coordAction, String dataSet, int min) { + CoordInputLogicEvaluatorResult result = new CoordInputLogicEvaluatorResult(); + CoordInputLogicEvaluatorResult pullResult = getEvalResult( + (AbstractCoordInputDependency) coordAction.getPullInputDependencies(), dataSet, min); + CoordInputLogicEvaluatorResult pushResult = getEvalResult( + (AbstractCoordInputDependency) coordAction.getPushInputDependencies(), dataSet, min); + result.appendDataSets(pullResult.getDataSets()); + result.appendDataSets(pushResult.getDataSets()); + + if (pullResult.isWaiting() || pushResult.isWaiting()) { + result.setStatus(STATUS.TIMED_WAITING); + } + + else if (pullResult.isPhaseTwoEvaluation() || pushResult.isPhaseTwoEvaluation()) { + result.setStatus(STATUS.PHASE_TWO_EVALUATION); + } + + else if (pullResult.isTrue() || pushResult.isTrue()) { + result.setStatus(STATUS.TRUE); + } + else { + result.setStatus(STATUS.FALSE); + } + return result; + + } + + /** + * Gets evaluator Result + * + * @param coordInputDependencies the coord dependencies + * @param dataSet the data set + * @param min the min + * @return the coord input logic evaluator result + */ + public CoordInputLogicEvaluatorResult getEvalResult(AbstractCoordInputDependency coordInputDependencies, + String dataSet, int min) { + CoordInputLogicEvaluatorResult result = new CoordInputLogicEvaluatorResult(); + if ((coordInputDependencies.getAvailableDependencies(dataSet) == null || coordInputDependencies + .getAvailableDependencies(dataSet).isEmpty())) { + if (min == 0) { + result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); + } + else { + result.setStatus(CoordInputLogicEvaluatorResult.STATUS.FALSE); + } + } + + if (min > -1 && coordInputDependencies.getAvailableDependencies(dataSet).size() >= min) { + result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); + result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet), dataSet)); + } + + else if (coordInputDependencies.isDataSetResolved(dataSet)) { + result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE); + result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet), dataSet)); + } + return result; + } +}
