http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java 
b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
index 5d23866..ffa0943 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
@@ -19,11 +19,13 @@
 package org.apache.oozie.coord;
 
 import com.google.common.collect.Lists;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.dependency.URIHandler.Context;
 import org.apache.oozie.service.Services;
@@ -32,6 +34,7 @@ import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
+import org.jdom.JDOMException;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -61,7 +64,6 @@ public class CoordELFunctions {
     public static final long DAY_MSEC = 24 * HOUR_MSEC;
     public static final long MONTH_MSEC = 30 * DAY_MSEC;
     public static final long YEAR_MSEC = 365 * DAY_MSEC;
-
     /**
      * Used in defining the frequency in 'day' unit. <p> domain: <code> val 
&gt; 0</code> and should be integer.
      *
@@ -348,7 +350,7 @@ public class CoordELFunctions {
                             
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
                             resolvedURIPaths.append(uriPath);
                             retVal = resolvedInstances.toString();
-                            eval.setVariable("resolved_path", 
resolvedURIPaths.toString());
+                            eval.setVariable(CoordELConstants.RESOLVED_PATH, 
resolvedURIPaths.toString());
                             break;
                         }
                         else if (available >= startOffset) {
@@ -356,6 +358,7 @@ public class CoordELFunctions {
                             
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
                                     INSTANCE_SEPARATOR);
                             
resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+
                         }
                         available++;
                     }
@@ -366,6 +369,10 @@ public class CoordELFunctions {
                     checkedInstance++;
                     // DateUtils.moveToEnd(nominalInstanceCal, 
getDSEndOfFlag());
                 }
+                if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && 
eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
+                    eval.setVariable(CoordELConstants.RESOLVED_PATH, 
resolvedURIPaths.toString());
+                }
+
             }
             finally {
                 if (uriContext != null) {
@@ -375,7 +382,7 @@ public class CoordELFunctions {
             if (!resolved) {
                 // return unchanged future function with variable 'is_resolved'
                 // to 'false'
-                eval.setVariable("is_resolved", Boolean.FALSE);
+                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
                 if (startOffset == endOffset) {
                     retVal = "${coord:future(" + startOffset + ", " + instance 
+ ")}";
                 }
@@ -384,11 +391,11 @@ public class CoordELFunctions {
                 }
             }
             else {
-                eval.setVariable("is_resolved", Boolean.TRUE);
+                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
             }
         }
         else {// No feasible nominal time
-            eval.setVariable("is_resolved", Boolean.TRUE);
+            eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
             retVal = "";
         }
         return retVal;
@@ -495,8 +502,24 @@ public class CoordELFunctions {
     public static String ph3_coord_dataIn(String dataInName) {
         String uris = "";
         ELEvaluator eval = ELEvaluator.getCurrent();
+        if (eval.getVariable(".datain." + dataInName) == null
+                && 
!StringUtils.isEmpty(eval.getVariable(".actionInputLogic").toString())) {
+            try {
+                return new 
CoordInputLogicEvaluatorUtil().getInputDependencies(dataInName,
+                        (SyncCoordAction) eval.getVariable(COORD_ACTION));
+            }
+            catch (JDOMException e) {
+                XLog.getLog(CoordELFunctions.class).error(e);
+                throw new RuntimeException(e.getMessage());
+            }
+        }
+
         uris = (String) eval.getVariable(".datain." + dataInName);
-        Boolean unresolved = (Boolean) eval.getVariable(".datain." + 
dataInName + ".unresolved");
+        Object unResolvedObj = eval.getVariable(".datain." + dataInName + 
".unresolved");
+        if (unResolvedObj == null) {
+            return uris;
+        }
+        Boolean unresolved = Boolean.parseBoolean(unResolvedObj.toString());
         if (unresolved != null && unresolved.booleanValue() == true) {
             return "${coord:dataIn('" + dataInName + "')}";
         }
@@ -835,7 +858,7 @@ public class CoordELFunctions {
     public static String ph1_coord_dataIn_echo(String n) {
         ELEvaluator eval = ELEvaluator.getCurrent();
         String val = (String) eval.getVariable("oozie.dataname." + n);
-        if (val == null || val.equals("data-in") == false) {
+        if ((val == null || val.equals("data-in") == false)) {
             XLog.getLog(CoordELFunctions.class).error("data_in_name " + n + " 
is not valid");
             throw new RuntimeException("data_in_name " + n + " is not valid");
         }
@@ -1112,7 +1135,8 @@ public class CoordELFunctions {
                             
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
                             resolvedURIPaths.append(uriPath);
                             retVal = resolvedInstances.toString();
-                            eval.setVariable("resolved_path", 
resolvedURIPaths.toString());
+                            eval.setVariable(CoordELConstants.RESOLVED_PATH, 
resolvedURIPaths.toString());
+
                             break;
                         }
                         else if (available <= endOffset) {
@@ -1130,6 +1154,9 @@ public class CoordELFunctions {
                     nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), 
instCount[0] * datasetFrequency);
                     // DateUtils.moveToEnd(nominalInstanceCal, 
getDSEndOfFlag());
                 }
+                if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && 
eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
+                    eval.setVariable(CoordELConstants.RESOLVED_PATH, 
resolvedURIPaths.toString());
+                }
             }
             finally {
                 if (uriContext != null) {
@@ -1139,7 +1166,7 @@ public class CoordELFunctions {
             if (!resolved) {
                 // return unchanged latest function with variable 'is_resolved'
                 // to 'false'
-                eval.setVariable("is_resolved", Boolean.FALSE);
+                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
                 if (startOffset == endOffset) {
                     retVal = "${coord:latest(" + startOffset + ")}";
                 }
@@ -1148,11 +1175,11 @@ public class CoordELFunctions {
                 }
             }
             else {
-                eval.setVariable("is_resolved", Boolean.TRUE);
+                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
             }
         }
         else {// No feasible nominal time
-            eval.setVariable("is_resolved", Boolean.FALSE);
+            eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
         }
         return retVal;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java 
b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
index 94c6974..82f9bed 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java
@@ -38,6 +38,8 @@ import org.apache.oozie.XException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
+import org.apache.oozie.coord.input.logic.InputLogicParser;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import 
org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -51,7 +53,9 @@ import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.Pair;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
+import org.jdom.JDOMException;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -414,4 +418,22 @@ public class CoordUtils {
         }
         return params;
     }
+
+    public static boolean isInputLogicSpecified(String actionXml) throws 
JDOMException {
+        return isInputLogicSpecified(XmlUtils.parseXml(actionXml));
+    }
+
+    public static boolean isInputLogicSpecified(Element eAction) throws 
JDOMException {
+        return eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, 
eAction.getNamespace()) != null;
+    }
+
+    public static String getInputLogic(String actionXml) throws JDOMException {
+        return getInputLogic(XmlUtils.parseXml(actionXml));
+    }
+
+    public static String getInputLogic(Element actionXml) throws JDOMException 
{
+        return new 
InputLogicParser().parse(actionXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC,
+                actionXml.getNamespace()));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java 
b/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java
index 44258eb..5f6d7a8 100644
--- a/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java
+++ b/core/src/main/java/org/apache/oozie/coord/SyncCoordAction.java
@@ -20,6 +20,7 @@ package org.apache.oozie.coord;
 
 import java.util.Date;
 import java.util.TimeZone;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
 
 /**
  * This class represents a Coordinator action.
@@ -34,6 +35,10 @@ public class SyncCoordAction {
     private TimeUnit timeUnit;
     private TimeUnit endOfDuration; // End of Month or End of Days
 
+    private CoordInputDependency pullDependencies;
+    private CoordInputDependency pushDependencies;
+
+
     public String getActionId() {
         return this.actionId;
     }
@@ -110,4 +115,21 @@ public class SyncCoordAction {
         this.endOfDuration = endOfDuration;
     }
 
+    public CoordInputDependency getPullDependencies() {
+        return pullDependencies;
+    }
+
+    public void setPullDependencies(CoordInputDependency pullDependencies) {
+        this.pullDependencies = pullDependencies;
+    }
+
+    public CoordInputDependency getPushDependencies() {
+        return pushDependencies;
+    }
+
+    public void setPushDependencies(CoordInputDependency pushDependencies) {
+        this.pushDependencies = pushDependencies;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
new file mode 100644
index 0000000..0da60ec
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.WritableUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public abstract class AbstractCoordInputDependency implements Writable, 
CoordInputDependency {
+    protected boolean isDependencyMet = false;
+    /*
+     * Transient variables only used for processing, not stored in DB.
+     */
+    protected transient Map<String, List<String>> missingDependenciesSet = new 
HashMap<String, List<String>>();
+    protected transient Map<String, List<String>> availableDependenciesSet = 
new HashMap<String, List<String>>();
+    protected Map<String, List<CoordInputInstance>> dependencyMap = new 
HashMap<String, List<CoordInputInstance>>();
+
+    public AbstractCoordInputDependency() {
+    }
+
+
+    public AbstractCoordInputDependency(Map<String, List<CoordInputInstance>> 
dependencyMap) {
+        this.dependencyMap = dependencyMap;
+        generateDependencies();
+    }
+
+    public void addInputInstanceList(String inputEventName, 
List<CoordInputInstance> inputInstanceList) {
+        dependencyMap.put(inputEventName, inputInstanceList);
+    }
+
+    public Map<String, List<CoordInputInstance>> getDependencyMap() {
+        return dependencyMap;
+    }
+
+    public void setDependencyMap(Map<String, List<CoordInputInstance>> 
dependencyMap) {
+        this.dependencyMap = dependencyMap;
+    }
+
+    public void addToAvailableDependencies(String dataSet, CoordInputInstance 
coordInputInstance) {
+        coordInputInstance.setAvailability(true);
+        List<String> availableSet = availableDependenciesSet.get(dataSet);
+        if (availableSet == null) {
+            availableSet = new ArrayList<String>();
+            availableDependenciesSet.put(dataSet, availableSet);
+        }
+        availableSet.add(coordInputInstance.getInputDataInstance());
+        removeFromMissingDependencies(dataSet, coordInputInstance);
+    }
+
+    public void removeFromMissingDependencies(String dataSet, 
CoordInputInstance coordInputInstance) {
+        coordInputInstance.setAvailability(true);
+        List<String> missingSet = missingDependenciesSet.get(dataSet);
+        if (missingSet != null) {
+            missingSet.remove(coordInputInstance.getInputDataInstance());
+            if (missingSet.isEmpty()) {
+                missingDependenciesSet.remove(dataSet);
+            }
+        }
+
+    }
+
+    public void addToMissingDependencies(String dataSet, CoordInputInstance 
coordInputInstance) {
+        List<String> availableSet = missingDependenciesSet.get(dataSet);
+        if (availableSet == null) {
+            availableSet = new ArrayList<String>();
+        }
+        availableSet.add(coordInputInstance.getInputDataInstance());
+        missingDependenciesSet.put(dataSet, availableSet);
+
+    }
+
+    protected void generateDependencies() {
+        try {
+            missingDependenciesSet = new HashMap<String, List<String>>();
+            availableDependenciesSet = new HashMap<String, List<String>>();
+
+            Set<String> keySets = dependencyMap.keySet();
+            for (String key : keySets) {
+                for (CoordInputInstance coordInputInstance : 
dependencyMap.get(key))
+                    if (coordInputInstance.isAvailable()) {
+                        addToAvailableDependencies(key, coordInputInstance);
+                    }
+                    else {
+                        addToMissingDependencies(key, coordInputInstance);
+                    }
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public List<String> getAvailableDependencies(String dataSet) {
+        if (availableDependenciesSet.get(dataSet) != null) {
+            return availableDependenciesSet.get(dataSet);
+        }
+        else {
+            return new ArrayList<String>();
+        }
+
+    }
+
+    public String getMissingDependencies(String dataSet) {
+        StringBuilder sb = new StringBuilder();
+        for (String dependencies : missingDependenciesSet.get(dataSet)) {
+            sb.append(dependencies).append("#");
+        }
+        return sb.toString();
+    }
+
+    public void addToAvailableDependencies(String dataSet, String 
availableSet) {
+        List<CoordInputInstance> list = dependencyMap.get(dataSet);
+        if (list == null) {
+            list = new ArrayList<CoordInputInstance>();
+            dependencyMap.put(dataSet, list);
+        }
+
+        for (String available : 
availableSet.split(CoordELFunctions.INSTANCE_SEPARATOR)) {
+            CoordInputInstance coordInstance = new 
CoordInputInstance(available, true);
+            list.add(coordInstance);
+            addToAvailableDependencies(dataSet, coordInstance);
+        }
+
+    }
+
+    public String getMissingDependencies() {
+        StringBuilder sb = new StringBuilder();
+        if (missingDependenciesSet != null) {
+            for (List<String> dependenciesList : 
missingDependenciesSet.values()) {
+                for (String dependencies : dependenciesList) {
+                    sb.append(dependencies).append("#");
+                }
+            }
+        }
+        return sb.toString();
+    }
+
+    public List<String> getMissingDependenciesAsList() {
+        List<String> missingDependencies = new ArrayList<String>();
+        for (List<String> dependenciesList : missingDependenciesSet.values()) {
+            missingDependencies.addAll(dependenciesList);
+        }
+        return missingDependencies;
+    }
+
+    public List<String> getAvailableDependenciesAsList() {
+        List<String> availableDependencies = new ArrayList<String>();
+        for (List<String> dependenciesList : 
availableDependenciesSet.values()) {
+            availableDependencies.addAll(dependenciesList);
+
+        }
+        return availableDependencies;
+    }
+
+    public String serialize() throws IOException {
+        return CoordInputDependencyFactory.getMagicNumber()
+                + new String(WritableUtils.toByteArray(this), 
CoordInputDependencyFactory.CHAR_ENCODING);
+
+    }
+
+    public String getListAsString(List<String> dataSets) {
+        StringBuilder sb = new StringBuilder();
+        for (String dependencies : dataSets) {
+            sb.append(dependencies).append("#");
+        }
+
+        return sb.toString();
+    }
+
+    public void setDependencyMet(boolean isDependencyMeet) {
+        this.isDependencyMet = isDependencyMeet;
+    }
+
+    public boolean isDependencyMet() {
+        return missingDependenciesSet.isEmpty() || isDependencyMet;
+    }
+
+    public boolean isUnResolvedDependencyMet() {
+        return false;
+    }
+
+
+    @Override
+    public void addToAvailableDependencies(Collection<String> availableList) {
+        for (Entry<String, List<CoordInputInstance>> dependenciesList : 
dependencyMap.entrySet()) {
+            for (CoordInputInstance coordInputInstance : 
dependenciesList.getValue()) {
+                if 
(availableList.contains(coordInputInstance.getInputDataInstance()))
+                    addToAvailableDependencies(dependenciesList.getKey(), 
coordInputInstance);
+            }
+        }
+    }
+
+    @Override
+    public ActionDependency checkPushMissingDependencies(CoordinatorActionBean 
coordAction,
+            boolean registerForNotification) throws CommandException, 
IOException,
+            JDOMException {
+        boolean status = new 
CoordInputLogicEvaluatorUtil(coordAction).checkPushDependencies();
+        if (status) {
+            coordAction.getPushInputDependencies().setDependencyMet(true);
+        }
+        return new 
ActionDependency(coordAction.getPushInputDependencies().getMissingDependenciesAsList(),
 coordAction
+                .getPushInputDependencies().getAvailableDependenciesAsList());
+
+    }
+
+    public boolean checkPullMissingDependencies(CoordinatorActionBean 
coordAction,
+            StringBuilder existList, StringBuilder nonExistList) throws 
IOException, JDOMException {
+        boolean status = new 
CoordInputLogicEvaluatorUtil(coordAction).checkPullMissingDependencies();
+        if (status) {
+            coordAction.getPullInputDependencies().setDependencyMet(true);
+        }
+        return status;
+
+    }
+
+    public boolean isChangeInDependency(StringBuilder nonExistList, String 
missingDependencies,
+            StringBuilder nonResolvedList, boolean status) {
+        if (!StringUtils.isEmpty(missingDependencies)) {
+            return !missingDependencies.equals(getMissingDependencies());
+        }
+        else {
+            return true;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public boolean checkUnresolved(CoordinatorActionBean coordAction, Element 
eAction)
+            throws Exception {
+        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
+        Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
+        Date actualTime = null;
+        if (actualTimeStr == null) {
+            actualTime = new Date();
+        }
+        else {
+            actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
+        }
+        if (inputList == null) {
+            return true;
+        }
+        List<Element> eDataEvents = inputList.getChildren("data-in", 
eAction.getNamespace());
+        for (Element dEvent : eDataEvents) {
+            if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, 
dEvent.getNamespace()) == null) {
+                continue;
+            }
+            String unResolvedInstance = 
dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG,
+                    dEvent.getNamespace()).getTextTrim();
+            String name = dEvent.getAttribute("name").getValue();
+            addUnResolvedList(name, unResolvedInstance);
+        }
+        return new 
CoordInputLogicEvaluatorUtil(coordAction).checkUnResolved(actualTime);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        WritableUtils.writeStringAsBytes(out,INTERNAL_VERSION_ID);
+        out.writeBoolean(isDependencyMet);
+        WritableUtils.writeMapWithList(out, dependencyMap);
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        WritableUtils.readBytesAsString(in);
+        this.isDependencyMet = in.readBoolean();
+        dependencyMap = WritableUtils.readMapWithList(in, 
CoordInputInstance.class);
+        generateDependencies();
+    }
+
+    public boolean isDataSetResolved(String dataSet){
+        if(getAvailableDependencies(dataSet) ==null|| 
getDependencyMap().get(dataSet) == null){
+            return false;
+        }
+        return getAvailableDependencies(dataSet).size() == 
getDependencyMap().get(dataSet).size();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java
new file mode 100644
index 0000000..cf0edd0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependency.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.dependency.ActionDependency;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public interface CoordInputDependency {
+
+    public static final String INTERNAL_VERSION_ID = "V=1";
+
+    /**
+     * Adds the input instance list.
+     *
+     * @param inputEventName the input event name
+     * @param inputInstanceList the input instance list
+     */
+    public void addInputInstanceList(String inputEventName, 
List<CoordInputInstance> inputInstanceList);
+
+    /**
+     * Gets the missing dependencies.
+     *
+     * @return the missing dependencies
+     */
+    public String getMissingDependencies();
+
+    /**
+     * Checks if dependencies are meet.
+     *
+     * @return true, if dependencies are meet
+     */
+    public boolean isDependencyMet();
+
+    /**
+     * Checks if is unresolved dependencies met.
+     *
+     * @return true, if unresolved dependencies are met
+     */
+    public boolean isUnResolvedDependencyMet();
+
+    /**
+     * Sets the dependency meet.
+     *
+     * @param isMissingDependenciesMet the new dependency met
+     */
+    public void setDependencyMet(boolean isMissingDependenciesMet);
+
+    /**
+     * Serialize.
+     *
+     * @return the string
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public String serialize() throws IOException;
+
+    /**
+     * Gets the missing dependencies as list.
+     *
+     * @return the missing dependencies as list
+     */
+    public List<String> getMissingDependenciesAsList();
+
+    /**
+     * Gets the available dependencies as list.
+     *
+     * @return the available dependencies as list
+     */
+    public List<String> getAvailableDependenciesAsList();
+
+    /**
+     * Sets the missing dependencies.
+     *
+     * @param missingDependencies the new missing dependencies
+     */
+    public void setMissingDependencies(String missingDependencies);
+
+    /**
+     * Adds the un resolved list.
+     *
+     * @param name the name
+     * @param tmpUnresolved the tmp unresolved
+     */
+    public void addUnResolvedList(String name, String tmpUnresolved);
+
+    /**
+     * Gets the available dependencies.
+     *
+     * @param dataSet the data set
+     * @return the available dependencies
+     */
+    public List<String> getAvailableDependencies(String dataSet);
+
+    /**
+     * Adds the to available dependencies.
+     *
+     * @param availDepList the avail dep list
+     */
+    public void addToAvailableDependencies(Collection<String> availDepList);
+
+    /**
+     * Check push missing dependencies.
+     *
+     * @param coordAction the coord action
+     * @param registerForNotification the register for notification
+     * @return the action dependency
+     * @throws CommandException the command exception
+     * @throws IOException Signals that an I/O exception has occurred.
+     * @throws JDOMException the JDOM exception
+     */
+    public ActionDependency checkPushMissingDependencies(CoordinatorActionBean 
coordAction,
+            boolean registerForNotification) throws CommandException, 
IOException, JDOMException;
+
+    /**
+     * Check pull missing dependencies.
+     *
+     * @param coordAction the coord action
+     * @param existList the exist list
+     * @param nonExistList the non exist list
+     * @return true, if successful
+     * @throws IOException Signals that an I/O exception has occurred.
+     * @throws JDOMException the JDOM exception
+     */
+    public boolean checkPullMissingDependencies(CoordinatorActionBean 
coordAction, StringBuilder existList,
+            StringBuilder nonExistList) throws IOException, JDOMException;
+
+    /**
+     * Checks if is change in dependency.
+     *
+     * @param nonExistList the non exist list
+     * @param missingDependencies the missing dependencies
+     * @param nonResolvedList the non resolved list
+     * @param status the status
+     * @return true, if is change in dependency
+     */
+    public boolean isChangeInDependency(StringBuilder nonExistList, String 
missingDependencies,
+            StringBuilder nonResolvedList, boolean status);
+
+    /**
+     * Check unresolved.
+     *
+     * @param coordAction the coord action
+     * @param eAction
+     * @return true, if successful
+     * @throws Exception the exception
+     */
+    public boolean checkUnresolved(CoordinatorActionBean coordAction, Element 
eAction)
+            throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java
new file mode 100644
index 0000000..ad50890
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputDependencyFactory.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.StringBlob;
+import org.apache.oozie.util.WritableUtils;
+import org.apache.oozie.util.XLog;
+
+public class CoordInputDependencyFactory {
+
+    // We need to choose magic number which is not allowed for file/dir.
+    // Magic number is ::$
+    private static final byte[] MAGIC_NUMBER = new byte[] { 58, 58, 36 };
+    public static final String CHAR_ENCODING = "ISO-8859-1";
+
+    public static XLog LOG = XLog.getLog(CoordInputDependencyFactory.class);
+
+    /**
+     * Create the pull dependencies.
+     *
+     * @param isInputLogicSpecified to check if input logic is enable
+     * @return the pull dependencies
+     */
+    public static CoordInputDependency createPullInputDependencies(boolean 
isInputLogicSpecified) {
+        if (!isInputLogicSpecified) {
+            return new CoordOldInputDependency();
+        }
+        else {
+            return new CoordPullInputDependency();
+        }
+    }
+
+    /**
+     * Create the push dependencies.
+     *
+     * @param isInputLogicSpecified to check if input logic is enable
+     * @return the push dependencies
+     */
+    public static CoordInputDependency createPushInputDependencies(boolean 
isInputLogicSpecified) {
+        if (!isInputLogicSpecified) {
+            return new CoordOldInputDependency();
+        }
+        else {
+            return new CoordPushInputDependency();
+        }
+    }
+
+    /**
+     * Gets the pull input dependencies.
+     *
+     * @param missingDependencies the missing dependencies
+     * @return the pull input dependencies
+     */
+    public static CoordInputDependency getPullInputDependencies(StringBlob 
missingDependencies) {
+        if (missingDependencies == null) {
+            return new CoordPullInputDependency();
+        }
+        return getPullInputDependencies(missingDependencies.getString());
+    }
+
+    public static CoordInputDependency getPullInputDependencies(String 
dependencies) {
+
+        if (StringUtils.isEmpty(dependencies)) {
+            return new CoordPullInputDependency();
+        }
+
+        if (!hasInputLogic(dependencies)) {
+            return new CoordOldInputDependency(dependencies);
+        }
+        else
+            try {
+                return 
WritableUtils.fromByteArray(getDependenciesWithoutMagicNumber(dependencies).getBytes(CHAR_ENCODING),
+                        CoordPullInputDependency.class);
+            }
+            catch (UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            }
+    }
+
+    /**
+     * Gets the push input dependencies.
+     *
+     * @param pushMissingDependencies the push missing dependencies
+     * @return the push input dependencies
+     */
+    public static CoordInputDependency getPushInputDependencies(StringBlob 
pushMissingDependencies) {
+
+        if (pushMissingDependencies == null) {
+            return new CoordPushInputDependency();
+        }
+        return getPushInputDependencies(pushMissingDependencies.getString());
+
+    }
+
+    public static CoordInputDependency getPushInputDependencies(String 
dependencies) {
+
+
+        if (StringUtils.isEmpty(dependencies)) {
+            return new CoordPushInputDependency();
+        }
+        if (!hasInputLogic(dependencies)) {
+            return new CoordOldInputDependency(dependencies);
+        }
+
+        else {
+            try {
+                return 
WritableUtils.fromByteArray(getDependenciesWithoutMagicNumber(dependencies).getBytes(CHAR_ENCODING),
+                        CoordPushInputDependency.class);
+            }
+            catch (UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+    }
+
+    /**
+     * Checks if input logic is enable.
+     *
+     * @param dependencies the dependencies
+     * @return true, if is input logic enable
+     */
+    private static boolean hasInputLogic(String dependencies) {
+        return dependencies.startsWith(getMagicNumber());
+    }
+
+    /**
+     * Gets the magic number.
+     *
+     * @return the magic number
+     */
+    public static String getMagicNumber() {
+        try {
+            return new String(MAGIC_NUMBER, CHAR_ENCODING);
+        }
+        catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * Gets the dependencies without magic number.
+     *
+     * @param dependencies the dependencies
+     * @return the dependencies without magic number
+     */
+    public static String getDependenciesWithoutMagicNumber(String 
dependencies) {
+        return dependencies.substring(getMagicNumber().length());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java
new file mode 100644
index 0000000..945fe44
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordInputInstance.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.oozie.util.WritableUtils;
+
+public class CoordInputInstance implements Writable {
+
+    private String inputDataInstance = "";
+    private boolean availability = false;
+
+    public CoordInputInstance() {
+
+    }
+
+    public CoordInputInstance(String inputDataInstance, boolean availability) {
+        this.inputDataInstance = inputDataInstance;
+        this.availability = availability;
+
+    }
+
+    /**
+     * Gets the input data instance.
+     *
+     * @return the input data instance
+     */
+    public String getInputDataInstance() {
+        return inputDataInstance;
+    }
+
+    /**
+     * Checks if is available.
+     *
+     * @return true, if is available
+     */
+    public boolean isAvailable() {
+        return availability;
+    }
+
+    public void setAvailability(boolean availability) {
+        this.availability = availability;
+    }
+
+    @Override
+    public String toString() {
+        return getInputDataInstance() + " : " + isAvailable();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        WritableUtils.writeStr(out, inputDataInstance);
+        out.writeBoolean(availability);
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        inputDataInstance = WritableUtils.readStr(in);
+        availability = in.readBoolean();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java
new file mode 100644
index 0000000..9fc348f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordOldInputDependency.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.coord.CoordELConstants;
+import org.apache.oozie.coord.CoordELEvaluator;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.dependency.URIHandlerException;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+/**
+ * Old approach where dependencies are stored as String.
+ *
+ */
+public class CoordOldInputDependency implements CoordInputDependency {
+
+    private XLog log = XLog.getLog(getClass());
+
+    protected transient String missingDependencies = "";
+
+    public CoordOldInputDependency(String missingDependencies) {
+        this.missingDependencies = missingDependencies;
+    }
+
+    public CoordOldInputDependency() {
+    }
+
+    @Override
+    public void addInputInstanceList(String inputEventName, 
List<CoordInputInstance> inputInstanceList) {
+        appendToDependencies(inputInstanceList);
+    }
+
+    @Override
+    public String getMissingDependencies() {
+        return missingDependencies;
+    }
+
+    @Override
+    public boolean isDependencyMet() {
+        return StringUtils.isEmpty(missingDependencies);
+    }
+
+    @Override
+    public boolean isUnResolvedDependencyMet() {
+        return false;
+    }
+
+    @Override
+    public void setDependencyMet(boolean isDependencyMeet) {
+        if (isDependencyMeet) {
+            missingDependencies = "";
+        }
+
+    }
+
+    @Override
+    public String serialize() throws IOException {
+        return missingDependencies;
+    }
+
+    @Override
+    public List<String> getMissingDependenciesAsList() {
+        return 
Arrays.asList(DependencyChecker.dependenciesAsArray(missingDependencies));
+    }
+
+    @Override
+    public List<String> getAvailableDependenciesAsList() {
+        return new ArrayList<String>();
+    }
+
+    @Override
+    public void setMissingDependencies(String missingDependencies) {
+        this.missingDependencies = missingDependencies;
+
+    }
+
+    public void appendToDependencies(List<CoordInputInstance> 
inputInstanceList) {
+        StringBuilder sb = new StringBuilder(missingDependencies);
+        boolean isFirst = true;
+        for (CoordInputInstance coordInputInstance : inputInstanceList) {
+            if (isFirst) {
+                if (!StringUtils.isEmpty(sb.toString())) {
+                    sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
+                }
+            }
+            else {
+                sb.append(CoordELFunctions.INSTANCE_SEPARATOR);
+
+            }
+            sb.append(coordInputInstance.getInputDataInstance());
+            isFirst = false;
+        }
+        missingDependencies = sb.toString();
+    }
+
+    @Override
+    public void addUnResolvedList(String name, String unresolvedDependencies) {
+        StringBuilder sb = new StringBuilder(missingDependencies);
+        
sb.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedDependencies);
+        missingDependencies = sb.toString();
+    }
+
+    @Override
+    public List<String> getAvailableDependencies(String dataSet) {
+        return null;
+    }
+
+    @Override
+    public void addToAvailableDependencies(Collection<String> availableList) {
+
+        if (StringUtils.isEmpty(missingDependencies)) {
+            return;
+        }
+        List<String> missingDependenciesList = new 
ArrayList<String>(Arrays.asList((DependencyChecker
+                .dependenciesAsArray(missingDependencies))));
+        missingDependenciesList.removeAll(availableList);
+        missingDependencies = 
DependencyChecker.dependenciesAsString(missingDependenciesList);
+
+    }
+
+    @Override
+    public boolean checkPullMissingDependencies(CoordinatorActionBean 
coordAction, StringBuilder existList,
+            StringBuilder nonExistList) throws IOException, JDOMException {
+        Configuration actionConf = new XConfiguration(new 
StringReader(coordAction.getRunConf()));
+        Element eAction = XmlUtils.parseXml(coordAction.getActionXml());
+
+        Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
+        if (inputList != null) {
+            if (nonExistList.length() > 0) {
+                checkListOfPaths(coordAction, existList, nonExistList, 
actionConf);
+            }
+            return nonExistList.length() == 0;
+        }
+        return true;
+    }
+
+    public ActionDependency checkPushMissingDependencies(CoordinatorActionBean 
coordAction,
+            boolean registerForNotification) throws CommandException, 
IOException {
+        return 
DependencyChecker.checkForAvailability(getMissingDependenciesAsList(), new 
XConfiguration(
+                new StringReader(coordAction.getRunConf())), 
!registerForNotification);
+    }
+
+    private boolean checkListOfPaths(CoordinatorActionBean coordAction, 
StringBuilder existList,
+            StringBuilder nonExistList, Configuration conf) throws IOException 
{
+
+        String[] uriList = 
nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
+        if (uriList[0] != null) {
+            log.info("[" + coordAction.getId() + "]::ActionInputCheck:: In 
checkListOfPaths: " + uriList[0]
+                    + " is Missing.");
+        }
+
+        nonExistList.delete(0, nonExistList.length());
+        boolean allExists = true;
+        String existSeparator = "", nonExistSeparator = "";
+        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), 
OozieClient.USER_NAME);
+        for (int i = 0; i < uriList.length; i++) {
+            if (allExists) {
+                allExists = pathExists(coordAction, uriList[i], conf, user);
+                log.info("[" + coordAction.getId() + "]::ActionInputCheck:: 
File:" + uriList[i] + ", Exists? :"
+                        + allExists);
+            }
+            if (allExists) {
+                existList.append(existSeparator).append(uriList[i]);
+                existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
+            }
+            else {
+                nonExistList.append(nonExistSeparator).append(uriList[i]);
+                nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
+            }
+        }
+        return allExists;
+    }
+
+    public boolean pathExists(CoordinatorActionBean coordAction, String sPath, 
Configuration actionConf, String user)
+            throws IOException {
+        log.debug("checking for the file " + sPath);
+        try {
+            return CoordCommandUtils.pathExists(sPath, actionConf, user);
+        }
+        catch (URIHandlerException e) {
+            if (coordAction != null) {
+                coordAction.setErrorCode(e.getErrorCode().toString());
+                coordAction.setErrorMessage(e.getMessage());
+            }
+            if (e.getCause() != null && e.getCause() instanceof 
AccessControlException) {
+                throw (AccessControlException) e.getCause();
+            }
+            else {
+                log.error(e);
+                throw new IOException(e);
+            }
+        }
+        catch (URISyntaxException e) {
+            if (coordAction != null) {
+                coordAction.setErrorCode(ErrorCode.E0906.toString());
+                coordAction.setErrorMessage(e.getMessage());
+            }
+            log.error(e);
+            throw new IOException(e);
+        }
+    }
+
+    public boolean isChangeInDependency(StringBuilder nonExistList, String 
missingDependencies,
+            StringBuilder nonResolvedList, boolean status) {
+        if ((!nonExistList.toString().equals(missingDependencies) || 
missingDependencies.isEmpty())) {
+            setMissingDependencies(nonExistList.toString());
+            return true;
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    public boolean checkUnresolved(CoordinatorActionBean coordAction, Element 
eAction)
+            throws Exception {
+        Date nominalTime = 
DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
+        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
+        Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
+
+        List<Element> eDataEvents = inputList.getChildren("data-in", 
eAction.getNamespace());
+        Configuration actionConf = new XConfiguration(new 
StringReader(coordAction.getRunConf()));
+
+        Date actualTime = null;
+        if (actualTimeStr == null) {
+            actualTime = new Date();
+        }
+        else {
+            actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
+        }
+
+        for (Element dEvent : eDataEvents) {
+            if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, 
dEvent.getNamespace()) == null) {
+                continue;
+            }
+            ELEvaluator eval = 
CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, 
actionConf);
+            String unResolvedInstance = 
dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG,
+                    dEvent.getNamespace()).getTextTrim();
+            String unresolvedList[] = 
unResolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
+            StringBuffer resolvedTmp = new StringBuffer();
+            for (int i = 0; i < unresolvedList.length; i++) {
+                String returnData = CoordELFunctions.evalAndWrap(eval, 
unresolvedList[i]);
+                Boolean isResolved = (Boolean) 
eval.getVariable(CoordELConstants.IS_RESOLVED);
+                if (isResolved == false) {
+                    log.info("[" + coordAction.getId() + "] :: Cannot resolve 
: " + returnData);
+                    return false;
+                }
+                if (resolvedTmp.length() > 0) {
+                    resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
+                }
+                resolvedTmp.append((String) 
eval.getVariable(CoordELConstants.RESOLVED_PATH));
+            }
+            if (resolvedTmp.length() > 0) {
+                if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
+                    
resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
+                            dEvent.getChild("uris", 
dEvent.getNamespace()).getTextTrim());
+                    dEvent.removeChild("uris", dEvent.getNamespace());
+                }
+                Element uriInstance = new Element("uris", 
dEvent.getNamespace());
+                uriInstance.addContent(resolvedTmp.toString());
+                dEvent.getContent().add(1, uriInstance);
+            }
+            dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, 
dEvent.getNamespace());
+        }
+
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java
new file mode 100644
index 0000000..f20dcae
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPullInputDependency.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.util.WritableUtils;
+
+public class CoordPullInputDependency extends AbstractCoordInputDependency {
+    private Map<String, CoordUnResolvedInputDependency> unResolvedList = new 
HashMap<String, CoordUnResolvedInputDependency>();
+
+    public CoordPullInputDependency() {
+        super();
+
+    }
+
+    public void addResolvedList(String dataSet, String list) {
+        
unResolvedList.get(dataSet).addResolvedList(Arrays.asList(list.split(",")));
+    }
+
+    public CoordUnResolvedInputDependency getUnResolvedDependency(String 
dataSet) {
+        return unResolvedList.get(dataSet);
+    }
+
+    public boolean isUnResolvedDependencyMet() {
+        for (CoordUnResolvedInputDependency coordUnResolvedDependency : 
unResolvedList.values()) {
+            if (!coordUnResolvedDependency.isResolved()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void addUnResolvedList(String dataSet, String dependency) {
+        unResolvedList.put(dataSet, new 
CoordUnResolvedInputDependency(Arrays.asList(dependency.split("#"))));
+    }
+
+    public String getMissingDependencies() {
+        StringBuffer bf = new StringBuffer(super.getMissingDependencies());
+        String unresolvedMissingDependencies = 
getUnresolvedMissingDependencies();
+        if (!StringUtils.isEmpty(unresolvedMissingDependencies)) {
+            bf.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR);
+            bf.append(unresolvedMissingDependencies);
+        }
+        return bf.toString();
+    }
+
+    public String getUnresolvedMissingDependencies() {
+        StringBuffer bf = new StringBuffer();
+        if (unResolvedList != null) {
+            for (CoordUnResolvedInputDependency coordUnResolvedDependency : 
unResolvedList.values()) {
+                if (!coordUnResolvedDependency.isResolved()) {
+                    String unresolvedList = 
coordUnResolvedDependency.getUnResolvedList();
+                    if (bf.length() > 0 && !unresolvedList.isEmpty()) {
+                        bf.append(CoordELFunctions.INSTANCE_SEPARATOR);
+                    }
+                    bf.append(unresolvedList);
+                }
+            }
+        }
+        return bf.toString();
+    }
+
+    protected void generateDependencies() {
+        super.generateDependencies();
+    }
+
+    private void writeObject(ObjectOutputStream os) throws IOException, 
ClassNotFoundException {
+        os.writeObject(unResolvedList);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        unResolvedList = (Map<String, CoordUnResolvedInputDependency>) 
in.readObject();
+        generateDependencies();
+    }
+
+    public boolean isDependencyMet() {
+        return isResolvedDependencyMeet() && isUnResolvedDependencyMet();
+
+    }
+
+    public boolean isResolvedDependencyMeet() {
+        return super.isDependencyMet();
+
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        WritableUtils.writeMap(out, unResolvedList);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        unResolvedList = WritableUtils.readMap(in, 
CoordUnResolvedInputDependency.class);
+    }
+
+    @Override
+    public void setMissingDependencies(String join) {
+        // We don't have to set this for input logic. Dependency map will have 
computed missing dependencies
+    }
+
+    @Override
+    public List<String> getAvailableDependencies(String dataSet) {
+        List<String> availableList = new ArrayList<String>();
+        availableList.addAll(super.getAvailableDependencies(dataSet));
+        if (getUnResolvedDependency(dataSet) != null) {
+            
availableList.addAll(getUnResolvedDependency(dataSet).getResolvedList());
+        }
+        return availableList;
+    }
+
+    public boolean isDataSetResolved(String dataSet) {
+        if(unResolvedList.containsKey(dataSet)){
+            return unResolvedList.get(dataSet).isResolved();
+        }
+        else{
+            return super.isDataSetResolved(dataSet);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java
new file mode 100644
index 0000000..e19e799
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordPushInputDependency.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class CoordPushInputDependency extends AbstractCoordInputDependency {
+
+    public CoordPushInputDependency() {
+        super();
+    }
+
+    @Override
+    public void setMissingDependencies(String join) {
+    }
+
+    @Override
+    public void addUnResolvedList(String name, String tmpUnresolved) {
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java
 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java
new file mode 100644
index 0000000..096b588
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/dependency/CoordUnResolvedInputDependency.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.dependency;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.util.WritableUtils;
+
+public class CoordUnResolvedInputDependency implements Writable {
+
+    private boolean isResolved;
+    private List<String> dependency = new ArrayList<String>();
+    private List<String> resolvedList = new ArrayList<String>();
+
+    public CoordUnResolvedInputDependency(List<String> dependency) {
+        this.dependency = dependency;
+
+    }
+
+    public CoordUnResolvedInputDependency() {
+    }
+
+    public boolean isResolved() {
+        return isResolved;
+    }
+
+    public void setResolved(boolean isResolved) {
+        this.isResolved = isResolved;
+    }
+
+    public List<String> getDependencies() {
+        return dependency;
+    }
+
+    public List<String> getResolvedList() {
+        return resolvedList;
+    }
+
+    public void setResolvedList(List<String> resolvedList) {
+        this.resolvedList = resolvedList;
+    }
+
+    public void addResolvedList(List<String> resolvedList) {
+        this.resolvedList.addAll(resolvedList);
+    }
+
+    public String getUnResolvedList() {
+        if (!isResolved) {
+            return StringUtils.join(dependency, 
CoordELFunctions.INSTANCE_SEPARATOR);
+        }
+        else
+            return "";
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeBoolean(isResolved);
+        WritableUtils.writeStringList(out, dependency);
+        WritableUtils.writeStringList(out, resolvedList);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+
+        isResolved = in.readBoolean();
+        dependency = WritableUtils.readStringList(in);
+        resolvedList = WritableUtils.readStringList(in);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java
new file mode 100644
index 0000000..2326cd7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicBuilder.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+
+public class CoordInputLogicBuilder {
+
+    StringBuffer bf = new StringBuffer();
+
+    CoordInputLogicEvaluator coordInputlogicEvaluator;
+
+    /** The Dependency builder. */
+    public CoordInputDependencyBuilder dependencyBuilder;
+
+    public CoordInputLogicBuilder(CoordInputLogicEvaluator 
coordInputlogicEvaluator) {
+        this.coordInputlogicEvaluator = coordInputlogicEvaluator;
+        dependencyBuilder = new 
CoordInputDependencyBuilder(coordInputlogicEvaluator);
+    }
+
+    /**
+     * Input function of input-logic
+     *
+     * @param inputDataset the dataset
+     * @return the string
+     */
+    public CoordInputLogicEvaluatorResult input(String inputDataset) {
+        return coordInputlogicEvaluator.evalInput(inputDataset, -1, -1);
+
+    }
+
+    /**
+     * Combine function of dataset
+     *
+     * @param combineDatasets the combine
+     * @return the string
+     */
+    public CoordInputLogicEvaluatorResult combine(String... combineDatasets) {
+        return coordInputlogicEvaluator.evalCombineInput(combineDatasets, -1, 
-1);
+    }
+
+    /**
+     * The Class CoordInputDependencyBuilder.
+     */
+    public static class CoordInputDependencyBuilder {
+
+        CoordInputLogicEvaluator coordInputLogicEvaluator;
+
+        public CoordInputDependencyBuilder(CoordInputLogicEvaluator 
coordInputLogicEvaluator) {
+            this.coordInputLogicEvaluator = coordInputLogicEvaluator;
+
+        }
+
+        private int minValue = -1;
+        private String wait;
+        private String inputDataset;
+        private String[] combineDatasets;
+
+        /**
+         * Construct min function
+         *
+         * @param minValue the min value
+         * @return the coord input dependency builder
+         */
+        public CoordInputDependencyBuilder min(int minValue) {
+            this.minValue = minValue;
+            return this;
+        }
+
+        /**
+         * Construct  wait function
+         *
+         * @param wait the wait
+         * @return the coord input dependency builder
+         */
+        public CoordInputDependencyBuilder inputWait(String wait) {
+            this.wait = wait;
+            return this;
+        }
+
+        /**
+         * Construct wait function
+         *
+         * @param wait the wait
+         * @return the coord input dependency builder
+         */
+        public CoordInputDependencyBuilder inputWait(int wait) {
+            this.wait = String.valueOf(wait);
+            return this;
+        }
+
+        /**
+         * Construct input function
+         *
+         * @param dataset the input
+         * @return the coord input dependency builder
+         */
+        public CoordInputDependencyBuilder input(String dataset) {
+            this.inputDataset = dataset;
+            return this;
+        }
+
+        /**
+         * Construct complie function
+         *
+         * @param combineDatasets the combine
+         * @return the coord input dependency builder
+         */
+        public CoordInputDependencyBuilder combine(String... combineDatasets) {
+            this.combineDatasets = combineDatasets;
+            return this;
+        }
+
+        /**
+         * Build inputlogic expression
+         *
+         * @return the string
+         * @throws IOException Signals that an I/O exception has occurred.
+         */
+        public CoordInputLogicEvaluatorResult build() throws IOException {
+            if (combineDatasets != null) {
+                return 
coordInputLogicEvaluator.evalCombineInput(combineDatasets, minValue, 
getTime(wait));
+            }
+            else {
+                return coordInputLogicEvaluator.evalInput(inputDataset, 
minValue, getTime(wait));
+            }
+        }
+
+        /**
+         * Gets the time in min.
+         *
+         * @param value the value
+         * @return the time in min
+         * @throws IOException Signals that an I/O exception has occurred.
+         */
+        private int getTime(String value) throws IOException {
+            if (StringUtils.isEmpty(value)) {
+                return -1;
+            }
+            if (StringUtils.isNumeric(value)) {
+                return Integer.parseInt(value);
+            }
+            else {
+                throw new IOException("Unsupported time : " + value);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java
new file mode 100644
index 0000000..c495570
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+public interface CoordInputLogicEvaluator {
+    public static final String INPUT_LOGIC = "input-logic";
+
+
+    /**
+     * Eval input.
+     *
+     * @param inputDataSet the input data set
+     * @param min the min
+     * @param wait the wait
+     * @return the coord input logic evaluator result
+     */
+    public CoordInputLogicEvaluatorResult evalInput(String inputDataSet, int 
min, int wait);
+
+    /**
+     * Eval combine input.
+     *
+     * @param combineDatasets the combine datasets
+     * @param min the min
+     * @param wait the wait
+     * @return the coord input logic evaluator result
+     */
+    public CoordInputLogicEvaluatorResult evalCombineInput(String[] 
combineDatasets, int min, int wait);
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java
new file mode 100644
index 0000000..f54d305
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseOne.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordInputInstance;
+import 
org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS;
+import org.apache.oozie.dependency.URIHandlerException;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+
+/**
+ * PhaseOne is for all dependencies check, except unresolved. Unresolved will 
be checked as part of phaseTwo.
+ * Phasethree is only to get dependencies from dataset, no hdfs/hcat check.
+ */
+public class CoordInputLogicEvaluatorPhaseOne implements 
CoordInputLogicEvaluator {
+
+    protected AbstractCoordInputDependency coordInputDependency;
+    protected Map<String, List<CoordInputInstance>> dependencyMap;
+    protected CoordinatorActionBean coordAction = null;
+    protected XLog log = XLog.getLog(getClass());
+
+    public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction) 
{
+        this(coordAction, coordAction.getPullInputDependencies());
+    }
+
+    public CoordInputLogicEvaluatorPhaseOne(CoordinatorActionBean coordAction, 
CoordInputDependency coordInputDependency) {
+        this.coordAction = coordAction;
+        this.coordInputDependency = (AbstractCoordInputDependency) 
coordInputDependency;
+        dependencyMap = ((AbstractCoordInputDependency) 
coordInputDependency).getDependencyMap();
+        LogUtils.setLogInfo(coordAction.getId());
+
+    }
+
+    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, 
int wait) {
+        return input(coordInputDependency, dataSet, min, wait);
+
+    }
+
+    /**
+     * Evaluate input function with min and wait
+     *
+     * @param coordInputDependency
+     * @param dataSet
+     * @param min
+     * @param wait
+     * @return the coord input logic evaluator result
+     */
+    public CoordInputLogicEvaluatorResult input(AbstractCoordInputDependency 
coordInputDependency, String dataSet,
+            int min, int wait) {
+
+        List<String> availableList = new ArrayList<String>();
+        if (coordInputDependency.getDependencyMap().get(dataSet) == null) {
+            CoordInputLogicEvaluatorResult retData = new 
CoordInputLogicEvaluatorResult();
+            if (coordInputDependency.getAvailableDependencies(dataSet) == null
+                    || 
coordInputDependency.getAvailableDependencies(dataSet).isEmpty()) {
+                log.debug("Data set [{0}] is unresolved set, will get resolved 
in phasetwo", dataSet);
+                
retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.PHASE_TWO_EVALUATION);
+            }
+            else {
+                return getResultFromPullPush(coordAction, dataSet, min);
+            }
+            return retData;
+        }
+        boolean allFound = true;
+        try {
+            Configuration actionConf = new XConfiguration(new 
StringReader(coordAction.getRunConf()));
+            List<CoordInputInstance> firstInputSetList = 
coordInputDependency.getDependencyMap().get(dataSet);
+            for (int i = 0; i < firstInputSetList.size(); i++) {
+                CoordInputInstance coordInputInstance = 
firstInputSetList.get(i);
+                if (!coordInputInstance.isAvailable()) {
+                    if (pathExists(coordInputInstance.getInputDataInstance(), 
actionConf)) {
+                        
availableList.add(coordInputInstance.getInputDataInstance());
+                        
coordInputDependency.addToAvailableDependencies(dataSet, coordInputInstance);
+                    }
+                    else {
+                        log.debug("[{0} is not found ", 
coordInputInstance.getInputDataInstance());
+                        allFound = false;
+                        // Stop looking for dependencies, if min is not 
specified.
+                        if (min < 0) {
+                            break;
+                        }
+                    }
+                }
+                else {
+                    
availableList.add(coordInputInstance.getInputDataInstance());
+                }
+            }
+        }
+        catch (Exception e) {
+            log.error(e);
+            throw new RuntimeException(ErrorCode.E1028.format("Error executing 
input function " + e.getMessage()));
+        }
+        CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, 
wait, availableList);
+
+        log.debug("Resolved status of Data set [{0}] with min [{1}] and wait 
[{2}]  =  [{3}]", dataSet, min, wait,
+                retData.getStatus());
+        return retData;
+    }
+
+    public boolean isInputWaitElapsed(int timeInMin) {
+
+        if (timeInMin == -1) {
+            return true;
+        }
+        long waitingTime = (new Date().getTime() - 
Math.max(coordAction.getNominalTime().getTime(), coordAction
+                .getCreatedTime().getTime()))
+                / (60 * 1000);
+        return timeInMin <= waitingTime;
+    }
+
+    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, 
int min, int wait) {
+        return combine(coordInputDependency, inputSets, min, wait);
+    }
+
+    public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency 
coordInputDependency,
+            String[] inputSets, int min, int wait) {
+
+        List<String> availableList = new ArrayList<String>();
+
+        if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) 
{
+            return new 
CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING);
+        }
+
+        try {
+
+            Configuration jobConf = new XConfiguration(new 
StringReader(coordAction.getRunConf()));
+            String firstInputSet = inputSets[0];
+            List<CoordInputInstance> firstInputSetList = 
coordInputDependency.getDependencyMap().get(firstInputSet);
+            for (int i = 0; i < firstInputSetList.size(); i++) {
+                CoordInputInstance coordInputInstance = 
firstInputSetList.get(i);
+                boolean found = false;
+                if (!coordInputInstance.isAvailable()) {
+                    if (!pathExists(coordInputInstance.getInputDataInstance(), 
jobConf)) {
+                        log.debug(MessageFormat.format("{0} is not found. 
Looking from other datasets.",
+                                coordInputInstance.getInputDataInstance()));
+                        for (int j = 1; j < inputSets.length; j++) {
+                            if 
(!coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable())
 {
+                                if 
(pathExists(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
+                                        .getInputDataInstance(), jobConf)) {
+                                    
coordInputDependency.addToAvailableDependencies(inputSets[j], 
coordInputDependency
+                                            
.getDependencyMap().get(inputSets[j]).get(i));
+                                    
availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
+                                            .getInputDataInstance());
+                                    log.debug(MessageFormat.format("{0} is 
found.",
+                                            
coordInputInstance.getInputDataInstance()));
+                                    found = true;
+                                }
+
+                            }
+                            else {
+                                
coordInputDependency.addToAvailableDependencies(inputSets[j], 
coordInputDependency
+                                        
.getDependencyMap().get(inputSets[j]).get(i));
+                                
availableList.add(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
+                                        .getInputDataInstance());
+                                found = true;
+
+                            }
+                        }
+                    }
+                    else {
+                        
coordInputDependency.addToAvailableDependencies(firstInputSet, 
coordInputInstance);
+                        
availableList.add(coordInputInstance.getInputDataInstance());
+                        found = true;
+                    }
+                }
+                else {
+                    
availableList.add(coordInputInstance.getInputDataInstance());
+                    found = true;
+                }
+
+                if (min < 0 && !found) {
+                    // Stop looking for dependencies, if min is not specified.
+                    break;
+                }
+
+            }
+        }
+        catch (Exception e) {
+            log.error(e);
+            throw new RuntimeException(ErrorCode.E1028.format("Error executing 
combine function " + e.getMessage()));
+        }
+        boolean allFound = availableList.size() == 
coordInputDependency.getDependencyMap().get(inputSets[0]).size();
+        CoordInputLogicEvaluatorResult retData = getEvalResult(allFound, min, 
wait, availableList);
+        log.debug("Resolved status of Data set [{0}] with min [{1}] and wait 
[{2}]  =  [{3}]",
+                Arrays.toString(inputSets), min, wait, retData.getStatus());
+        return retData;
+
+    }
+
+    public Configuration getConf() throws IOException {
+        return new XConfiguration(new StringReader(coordAction.getRunConf()));
+
+    }
+
+    public String getListAsString(List<String> list, String dataset) {
+        if (list == null || list.isEmpty()) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (int i = 1; i < list.size(); i++) {
+            sb.append(list.get(i - 1)).append(",");
+        }
+        sb.append(list.get(list.size() - 1));
+        return sb.toString();
+    }
+
+    protected CoordInputLogicEvaluatorResult getEvalResult(boolean found, int 
min, int wait, List<String> availableList) {
+        CoordInputLogicEvaluatorResult retData = new 
CoordInputLogicEvaluatorResult();
+        if (!found && wait > 0) {
+            if (!isInputWaitElapsed(wait)) {
+                return new 
CoordInputLogicEvaluatorResult(STATUS.TIMED_WAITING);
+            }
+        }
+
+        if (found || (min > 0 && availableList.size() >= min)) {
+            retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
+            retData.setDataSets(getListAsString(availableList, null));
+        }
+
+        if (min == 0) {
+            retData.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
+        }
+
+        return retData;
+    }
+
+    protected boolean pathExists(String sPath, Configuration jobConf) throws 
IOException, URISyntaxException,
+            URIHandlerException {
+        return CoordCommandUtils.pathExists(sPath, jobConf);
+
+    }
+
+    public CoordInputLogicEvaluatorResult 
getResultFromPullPush(CoordinatorActionBean coordAction, String dataSet, int 
min) {
+        CoordInputLogicEvaluatorResult result = new 
CoordInputLogicEvaluatorResult();
+        CoordInputLogicEvaluatorResult pullResult = getEvalResult(
+                (AbstractCoordInputDependency) 
coordAction.getPullInputDependencies(), dataSet, min);
+        CoordInputLogicEvaluatorResult pushResult = getEvalResult(
+                (AbstractCoordInputDependency) 
coordAction.getPushInputDependencies(), dataSet, min);
+        result.appendDataSets(pullResult.getDataSets());
+        result.appendDataSets(pushResult.getDataSets());
+
+        if (pullResult.isWaiting() || pushResult.isWaiting()) {
+            result.setStatus(STATUS.TIMED_WAITING);
+        }
+
+        else if (pullResult.isPhaseTwoEvaluation() || 
pushResult.isPhaseTwoEvaluation()) {
+            result.setStatus(STATUS.PHASE_TWO_EVALUATION);
+        }
+
+        else if (pullResult.isTrue() || pushResult.isTrue()) {
+            result.setStatus(STATUS.TRUE);
+        }
+        else {
+            result.setStatus(STATUS.FALSE);
+        }
+        return result;
+
+    }
+
+    /**
+     * Gets evaluator Result
+     *
+     * @param coordInputDependencies the coord dependencies
+     * @param dataSet the data set
+     * @param min the min
+     * @return the coord input logic evaluator result
+     */
+    public CoordInputLogicEvaluatorResult 
getEvalResult(AbstractCoordInputDependency coordInputDependencies,
+            String dataSet, int min) {
+        CoordInputLogicEvaluatorResult result = new 
CoordInputLogicEvaluatorResult();
+        if ((coordInputDependencies.getAvailableDependencies(dataSet) == null 
|| coordInputDependencies
+                .getAvailableDependencies(dataSet).isEmpty())) {
+            if (min == 0) {
+                result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
+            }
+            else {
+                result.setStatus(CoordInputLogicEvaluatorResult.STATUS.FALSE);
+            }
+        }
+
+        if (min > -1 && 
coordInputDependencies.getAvailableDependencies(dataSet).size() >= min) {
+            result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
+            
result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet),
 dataSet));
+        }
+
+        else if (coordInputDependencies.isDataSetResolved(dataSet)) {
+            result.setStatus(CoordInputLogicEvaluatorResult.STATUS.TRUE);
+            
result.appendDataSets(getListAsString(coordInputDependencies.getAvailableDependencies(dataSet),
 dataSet));
+        }
+        return result;
+    }
+}

Reply via email to