http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java
new file mode 100644
index 0000000..31cf081
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordInputInstance;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.ELEvaluator;
+
+public class CoordInputLogicEvaluatorPhaseThree extends 
CoordInputLogicEvaluatorPhaseOne {
+
+    ELEvaluator eval;
+
+    public CoordInputLogicEvaluatorPhaseThree(CoordinatorActionBean 
coordAction, ELEvaluator eval) {
+        super(coordAction, (AbstractCoordInputDependency) 
coordAction.getPullInputDependencies());
+        this.eval = eval;
+    }
+
+    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, 
int wait) {
+        return getResultFromPullPush(coordAction, dataSet, min);
+
+    }
+
+    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, 
int min, int wait) {
+        return combine(coordInputDependency, inputSets, min, wait);
+    }
+
+    public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency 
coordInputDependency,
+            String[] inputSets, int min, int wait) {
+
+        List<String> availableList = new ArrayList<String>();
+
+        if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) 
{
+            return new 
CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
+        }
+
+        try {
+            String firstInputSet = inputSets[0];
+            List<CoordInputInstance> firstInputSetList = 
coordInputDependency.getDependencyMap().get(firstInputSet);
+            for (int i = 0; i < firstInputSetList.size(); i++) {
+                CoordInputInstance coordInputInstance = 
firstInputSetList.get(i);
+                if (!coordInputInstance.isAvailable()) {
+                    for (int j = 1; j < inputSets.length; j++) {
+                        if 
(coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable())
 {
+                            availableList.add(getPathWithoutDoneFlag(
+                                    
coordInputDependency.getDependencyMap().get(inputSets[j]).get(i)
+                                            .getInputDataInstance(), 
inputSets[j]));
+                        }
+                    }
+                }
+
+                else {
+                    
availableList.add(getPathWithoutDoneFlag(coordInputInstance.getInputDataInstance(),
 firstInputSet));
+                }
+            }
+        }
+        catch (Exception e) {
+            log.error(e);
+            throw new RuntimeException(ErrorCode.E1028.format("Error executing 
combine function " + e.getMessage()));
+        }
+        boolean allFound = availableList.size() == 
coordInputDependency.getDependencyMap().get(inputSets[0]).size();
+        return getEvalResult(allFound, min, wait, availableList);
+    }
+
+    protected boolean pathExists(String sPath, Configuration actionConf) 
throws IOException, URISyntaxException,
+            URIHandlerException {
+        return false;
+    }
+
+    public boolean isInputWaitElapsed(int timeInMin) {
+        return true;
+    }
+
+    public String getListAsString(List<String> input, String dataSet) {
+        if (input == null || input.isEmpty()) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        try {
+
+            for (int i = 1; i < input.size(); i++) {
+                sb.append(getPathWithoutDoneFlag(input.get(i - 1), 
dataSet)).append(",");
+            }
+            sb.append(getPathWithoutDoneFlag(input.get(input.size() - 1), 
dataSet));
+        }
+        catch (URIHandlerException e) {
+            log.error(e);
+            throw new RuntimeException(ErrorCode.E1028.format("Error finding 
path without done flag " + e.getMessage()));
+        }
+
+        return sb.toString();
+    }
+
+    private String getPathWithoutDoneFlag(String sPath, String dataSet) throws 
URIHandlerException {
+        if (dataSet == null) {
+            return sPath;
+        }
+        URIHandlerService service = 
Services.get().get(URIHandlerService.class);
+        URIHandler handler = service.getURIHandler(sPath);
+        return handler.getURIWithoutDoneFlag(sPath, 
eval.getVariable(".datain." + dataSet + ".doneFlag").toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java
new file mode 100644
index 0000000..16fc400
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.coord.CoordELConstants;
+import org.apache.oozie.coord.CoordELEvaluator;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordPullInputDependency;
+import 
org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public class CoordInputLogicEvaluatorPhaseTwo extends 
CoordInputLogicEvaluatorPhaseOne {
+
+    Date actualTime;
+
+    public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction, 
Date actualTime) {
+        super(coordAction);
+        this.actualTime = actualTime;
+    }
+
+    public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction,
+            AbstractCoordInputDependency coordInputDependency) {
+        super(coordAction, coordInputDependency);
+    }
+
+    @Override
+    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, 
int wait) {
+        try {
+            CoordPullInputDependency coordPullInputDependency = 
(CoordPullInputDependency) coordInputDependency;
+            ELEvaluator eval = 
CoordELEvaluator.createLazyEvaluator(actualTime, coordAction.getNominalTime(),
+                    getInputSetEvent(dataSet), getConf());
+            if (coordPullInputDependency.getUnResolvedDependency(dataSet) == 
null) {
+                return super.evalInput(dataSet, min, wait);
+
+            }
+            else {
+                cleanPreviousCheckData(coordPullInputDependency, dataSet);
+                List<String> unresolvedList = 
coordPullInputDependency.getUnResolvedDependency(dataSet)
+                        .getDependencies();
+                for (String unresolved : unresolvedList) {
+                    String resolvedPath = "";
+
+                    CoordELFunctions.evalAndWrap(eval, unresolved);
+                    boolean isResolved = (Boolean) 
eval.getVariable(CoordELConstants.IS_RESOLVED);
+
+                    coordPullInputDependency.setDependencyMap(dependencyMap);
+                    if (eval.getVariable(CoordELConstants.RESOLVED_PATH) != 
null) {
+                        resolvedPath = 
eval.getVariable(CoordELConstants.RESOLVED_PATH).toString();
+                    }
+                    if (resolvedPath != null) {
+                        resolvedPath = getEvalResult(isResolved, min, wait,
+                                
Arrays.asList(DependencyChecker.dependenciesAsArray(resolvedPath.toString())))
+                                .getDataSets();
+
+                    }
+
+                    log.trace(MessageFormat.format("Return data is {0}", 
resolvedPath));
+                    log.debug(MessageFormat.format("Resolved status of Data 
set {0} with min {1} and wait {2}  =  {3}",
+                            dataSet, min, wait, 
!StringUtils.isEmpty(resolvedPath)));
+
+                    if ((isInputWaitElapsed(wait) || isResolved) && 
!StringUtils.isEmpty(resolvedPath)) {
+                        coordPullInputDependency.addResolvedList(dataSet, 
resolvedPath.toString());
+                    }
+                    else {
+                        cleanPreviousCheckData(coordPullInputDependency, 
dataSet);
+                        if (!isInputWaitElapsed(wait)) {
+                            return new CoordInputLogicEvaluatorResult(
+                                    
CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING);
+                        }
+                        else {
+                            return new 
CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
+                        }
+                    }
+                }
+                
coordPullInputDependency.getUnResolvedDependency(dataSet).setResolved(true);
+                return new CoordInputLogicEvaluatorResult(STATUS.TRUE, 
getListAsString(coordPullInputDependency
+                        .getUnResolvedDependency(dataSet).getResolvedList(), 
dataSet));
+
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(" event not found" + e, e);
+
+        }
+
+    }
+
+    private void cleanPreviousCheckData(CoordPullInputDependency 
coordPullInputDependency, String dataSet) {
+        // Previous check might have resolved and added resolved list. Cleanup 
any resolved list stored by previous
+        // check.
+        if (coordPullInputDependency.getUnResolvedDependency(dataSet) != null) 
{
+            
coordPullInputDependency.getUnResolvedDependency(dataSet).setResolvedList(new 
ArrayList<String>());
+        }
+
+    }
+
+    @Override
+    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, 
int min, int wait) {
+        throw new RuntimeException("Combine is not supported for 
latest/future");
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private Element getInputSetEvent(String name) throws JDOMException {
+        Element eAction = 
XmlUtils.parseXml(coordAction.getActionXml().toString());
+        Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
+        List<Element> eDataEvents = inputList.getChildren("data-in", 
eAction.getNamespace());
+        for (Element dEvent : eDataEvents) {
+            if (dEvent.getAttribute("name").getValue().equals(name)) {
+                return dEvent;
+            }
+        }
+        throw new RuntimeException("Event not found");
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java
new file mode 100644
index 0000000..f485296
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.input.dependency.CoordInputInstance;
+import org.apache.oozie.coord.input.dependency.CoordPullInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordPushInputDependency;
+
+public class CoordInputLogicEvaluatorPhaseValidate implements 
CoordInputLogicEvaluator {
+
+    CoordPullInputDependency coordPullInputDependency;
+    CoordPushInputDependency coordPushInputDependency;
+
+    protected Map<String, List<CoordInputInstance>> dependencyMap;
+    protected CoordinatorActionBean coordAction = null;
+    protected CoordinatorJob coordJob = null;
+
+    public CoordInputLogicEvaluatorPhaseValidate(CoordinatorActionBean 
coordAction) {
+        this.coordAction = coordAction;
+        coordPullInputDependency = (CoordPullInputDependency) 
coordAction.getPullInputDependencies();
+        coordPushInputDependency = (CoordPushInputDependency) 
coordAction.getPushInputDependencies();
+
+    }
+
+    @Override
+    public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, 
int wait) {
+        getDataSetLen(dataSet);
+        return new 
CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
+    }
+
+    @Override
+    public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, 
int min, int wait) {
+        if (inputSets.length <= 1) {
+            throw new RuntimeException("Combine should have at least two input 
sets. DataSets : "
+                    + Arrays.toString(inputSets));
+        }
+        int firstInputSetLen = getDataSetLen(inputSets[0]);
+        for (int i = 1; i < inputSets.length; i++) {
+            if (getDataSetLen(inputSets[i]) != firstInputSetLen) {
+                throw new RuntimeException("Combine should have same range. 
DataSets : " + Arrays.toString(inputSets));
+            }
+            if (coordPullInputDependency.getUnResolvedDependency(inputSets[i]) 
!= null) {
+                throw new RuntimeException("Combine is not supported for 
latest/future");
+            }
+        }
+        return new 
CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE);
+    }
+
+    private int getDataSetLen(String dataset) {
+        if (coordAction.getPullInputDependencies() != null) {
+            if (coordPullInputDependency.getDependencyMap().get(dataset) != 
null) {
+                return 
coordPullInputDependency.getDependencyMap().get(dataset).size();
+            }
+
+            if (coordPullInputDependency.getUnResolvedDependency(dataset) != 
null) {
+                return 1;
+            }
+
+        }
+        if (coordAction.getPushInputDependencies() != null) {
+            if (coordPushInputDependency.getDependencyMap().get(dataset) != 
null) {
+                return 
coordPushInputDependency.getDependencyMap().get(dataset).size();
+            }
+        }
+        throw new RuntimeException(" Data set not found : " + dataset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java
new file mode 100644
index 0000000..2f3f034
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.coord.CoordELFunctions;
+
+public class CoordInputLogicEvaluatorResult {
+
+    private STATUS status;
+    private String dataSets;
+
+    public static enum STATUS {
+        TRUE, FALSE, PHASE_TWO_EVALUATION, TIMED_WAITING
+    }
+
+    public CoordInputLogicEvaluatorResult() {
+    }
+
+    public CoordInputLogicEvaluatorResult(STATUS status, String dataSets) {
+        this.status = status;
+        this.dataSets = dataSets;
+    }
+
+    public CoordInputLogicEvaluatorResult(STATUS status) {
+        this.status = status;
+    }
+
+    public String getDataSets() {
+        return dataSets;
+    }
+
+    public void setDataSets(String dataSets) {
+        this.dataSets = dataSets;
+    }
+
+    public void appendDataSets(String inputDataSets) {
+        if (StringUtils.isEmpty(inputDataSets)) {
+            return;
+        }
+        if (StringUtils.isEmpty(this.dataSets)) {
+            this.dataSets = inputDataSets;
+        }
+        else {
+            this.dataSets = this.dataSets + CoordELFunctions.DIR_SEPARATOR + 
inputDataSets;
+        }
+    }
+
+    public void setStatus(STATUS status) {
+        this.status = status;
+    }
+
+    public STATUS getStatus() {
+        return status;
+    }
+
+    public boolean isTrue() {
+        if (status == null) {
+            return false;
+        }
+        switch (status) {
+            case TIMED_WAITING:
+            case PHASE_TWO_EVALUATION:
+            case TRUE:
+                return true;
+            default:
+                return false;
+        }
+
+    }
+
+    public boolean isWaiting() {
+        if (status == null) {
+            return false;
+        }
+        return status.equals(STATUS.TIMED_WAITING);
+
+    }
+
+    public boolean isPhaseTwoEvaluation() {
+        if (status == null) {
+            return false;
+        }
+        return status.equals(STATUS.PHASE_TWO_EVALUATION);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java
new file mode 100644
index 0000000..63c0760
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.util.Date;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.NamespaceResolver;
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.CoordUtils;
+import org.apache.oozie.coord.SyncCoordAction;
+import org.apache.oozie.coord.input.dependency.CoordPullInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordPushInputDependency;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public class CoordInputLogicEvaluatorUtil {
+
+    private CoordinatorActionBean coordAction = null;
+    private XLog log = XLog.getLog(getClass());
+
+    public CoordInputLogicEvaluatorUtil(CoordinatorActionBean coordAction) {
+        this.coordAction = coordAction;
+        LogUtils.setLogInfo(coordAction);
+
+    }
+
+    public CoordInputLogicEvaluatorUtil() {
+    }
+
+    /**
+     * Check pull missing dependencies.
+     *
+     * @return true, if successful
+     * @throws JDOMException the JDOM exception
+     */
+    public boolean checkPullMissingDependencies() throws JDOMException {
+        JexlEngine jexl = new OozieJexlEngine();
+
+        String expression = 
CoordUtils.getInputLogic(coordAction.getActionXml().toString());
+        if (StringUtils.isEmpty(expression)) {
+            return true;
+        }
+        Expression e = jexl.createExpression(expression);
+
+        JexlContext jc = new OozieJexlParser(jexl, new 
CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseOne(
+                coordAction, coordAction.getPullInputDependencies())));
+        CoordInputLogicEvaluatorResult result = 
(CoordInputLogicEvaluatorResult) e.evaluate(jc);
+        log.debug("Input logic expression for [{0}] and evaluate result is 
[{1}]", expression, result.isTrue());
+
+        if (result.isWaiting()) {
+            return false;
+        }
+        return result.isTrue();
+    }
+
+    /**
+     * Validate input logic.
+     *
+     * @throws JDOMException the JDOM exception
+     * @throws CommandException
+     */
+    public void validateInputLogic() throws JDOMException, CommandException {
+        JexlEngine jexl = new OozieJexlEngine();
+        String expression = 
CoordUtils.getInputLogic(coordAction.getActionXml().toString());
+        if (StringUtils.isEmpty(expression)) {
+            return;
+        }
+        Expression e = jexl.createExpression(expression);
+        JexlContext jc = new OozieJexlParser(jexl, new CoordInputLogicBuilder(
+                new CoordInputLogicEvaluatorPhaseValidate(coordAction)));
+        try {
+            Object result = e.evaluate(jc);
+            log.debug("Input logic expression is [{0}] and evaluate result is 
[{1}]", expression, result);
+
+        }
+        catch (RuntimeException re) {
+            throw new CommandException(ErrorCode.E1028, 
re.getCause().getMessage());
+        }
+
+    }
+
+    /**
+     * Get input dependencies.
+     *
+     * @param name the name
+     * @param syncCoordAction the sync coord action
+     * @return the string
+     * @throws JDOMException the JDOM exception
+     */
+    public String getInputDependencies(String name, SyncCoordAction 
syncCoordAction) throws JDOMException {
+        JexlEngine jexl = new OozieJexlEngine();
+
+        CoordinatorActionBean coordAction = new CoordinatorActionBean();
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        coordAction.setId(syncCoordAction.getActionId());
+        Element eJob = 
XmlUtils.parseXml(eval.getVariable(".actionInputLogic").toString());
+        String expression = new InputLogicParser().parseWithName(eJob, name);
+
+        Expression e = jexl.createExpression(expression);
+
+        CoordPullInputDependency pull = (CoordPullInputDependency) 
syncCoordAction.getPullDependencies();
+        CoordPushInputDependency push = (CoordPushInputDependency) 
syncCoordAction.getPushDependencies();
+
+        coordAction.setPushInputDependencies(push);
+
+        coordAction.setPullInputDependencies(pull);
+
+        JexlContext jc = new OozieJexlParser(jexl, new 
CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseThree(
+                coordAction, eval)));
+        CoordInputLogicEvaluatorResult result = 
(CoordInputLogicEvaluatorResult) e.evaluate(jc);
+        log.debug("Input logic expression for [{0}] is [{1}] and evaluate 
result is [{2}]", name, expression,
+                result.isTrue());
+
+        if (!result.isTrue()) {
+            return name + " is not resolved";
+        }
+        return result.getDataSets();
+
+    }
+
+    /**
+     * Check push dependencies.
+     *
+     * @return true, if successful
+     * @throws JDOMException the JDOM exception
+     */
+    public boolean checkPushDependencies() throws JDOMException {
+        JexlEngine jexl = new OozieJexlEngine();
+
+        String expression = 
CoordUtils.getInputLogic(coordAction.getActionXml().toString());
+        if (StringUtils.isEmpty(expression)) {
+            return true;
+        }
+
+        Expression e = jexl.createExpression(expression);
+        JexlContext jc = new OozieJexlParser(jexl, new 
CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseOne(
+                coordAction, coordAction.getPushInputDependencies())));
+        CoordInputLogicEvaluatorResult result = 
(CoordInputLogicEvaluatorResult) e.evaluate(jc);
+        log.debug("Input logic expression for [{0}] and evaluate result is 
[{1}]", expression, result.isTrue());
+
+        if (result.isWaiting()) {
+            return false;
+        }
+        return result.isTrue();
+    }
+
+    /**
+     * Check unresolved.
+     *
+     * @param actualTime the actual time
+     * @return true, if successful
+     * @throws JDOMException the JDOM exception
+     */
+    public boolean checkUnResolved(Date actualTime) throws JDOMException {
+        JexlEngine jexl = new OozieJexlEngine();
+
+        String expression = 
CoordUtils.getInputLogic(coordAction.getActionXml().toString());
+        if (StringUtils.isEmpty(expression)) {
+            return true;
+        }
+
+        Expression e = jexl.createExpression(expression);
+        JexlContext jc = new OozieJexlParser(jexl, new 
CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseTwo(
+                coordAction, actualTime)));
+        CoordInputLogicEvaluatorResult result = 
(CoordInputLogicEvaluatorResult) e.evaluate(jc);
+        log.debug("Input logic expression for [{0}] and evaluate result is 
[{1}]", expression, result.isTrue());
+
+        if (result.isWaiting()) {
+            return false;
+        }
+        return result.isTrue();
+
+    }
+
+    public class OozieJexlParser implements JexlContext, NamespaceResolver {
+        private final JexlEngine jexl;
+        private final CoordInputLogicBuilder object;
+
+        @Override
+        public Object resolveNamespace(String name) {
+            return object;
+        }
+
+        public OozieJexlParser(JexlEngine engine, CoordInputLogicBuilder 
wrapped) {
+            this.jexl = engine;
+            this.object = wrapped;
+        }
+
+        public Object get(String name) {
+            return jexl.getProperty(object, name);
+        }
+
+        public void set(String name, Object value) {
+            jexl.setProperty(object, name, value);
+        }
+
+        public boolean has(String name) {
+            return jexl.getUberspect().getPropertyGet(object, name, null) != 
null;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java 
b/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java
new file mode 100644
index 0000000..f1f6b41
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.jdom.Element;
+import org.jdom.Namespace;
+
+/**
+ * Parses xml into jexl expression
+ */
+public class InputLogicParser {
+
+    public final static String COORD_INPUT_EVENTS_DATA_IN = "data-in";
+
+    public final static String AND = "and";
+
+    public final static String OR = "or";
+
+    public final static String COMBINE = "combine";
+
+    /**
+     * Parses the xml.
+     *
+     * @param root the root
+     * @return the string
+     */
+    public String parse(Element root) {
+        return parseWithName(root, null);
+
+    }
+
+    /**
+     * Parses the xml with name.
+     *
+     * @param root the root
+     * @param name the name
+     * @return the string
+     */
+    @SuppressWarnings("unchecked")
+    public String parseWithName(Element root, String name) {
+        if (root == null) {
+            return "";
+        }
+        StringBuffer parsedString = new StringBuffer();
+
+        List<Element> childrens = root.getChildren();
+        for (int i = 0; i < childrens.size(); i++) {
+            String childName = childrens.get(i).getAttributeValue("name");
+            String min = childrens.get(i).getAttributeValue("min");
+            String wait = childrens.get(i).getAttributeValue("wait");
+
+            if (name == null || name.equals(childName)) {
+                parsedString.append(parse(childrens.get(i), 
getOpt(childrens.get(i).getName()), min, wait));
+            }
+            else {
+                parsedString.append(parseWithName(childrens.get(i), name));
+            }
+        }
+        return parsedString.toString();
+    }
+
+    public String parse(Element root, String opt, String min, String wait) {
+        StringBuffer parsedString = new StringBuffer();
+
+        Namespace ns = root.getNamespace();
+        if (root.getName().equals(COMBINE)) {
+            parsedString.append("(");
+            parsedString.append(processCombinedNode(root, 
getOpt(root.getName()), getMin(root, min),
+                    getWait(root, wait)));
+            parsedString.append(")");
+        }
+        else if (root.getName().equals(AND) || root.getName().equals(OR)) {
+            parsedString.append("(");
+            parsedString.append(parseAllChildren(root, opt, 
getOpt(root.getName()), getMin(root, min),
+                    getWait(root, wait)));
+            parsedString.append(")");
+
+        }
+        else if (root.getChild(COORD_INPUT_EVENTS_DATA_IN, ns) != null) {
+            parsedString.append("(");
+            parsedString.append(processChildNode(root, getOpt(root.getName()), 
getMin(root, min), getWait(root, wait)));
+            parsedString.append(")");
+        }
+        else if (root.getName().equals(COORD_INPUT_EVENTS_DATA_IN)) {
+            parsedString.append(parseDataInNode(root, min, wait));
+
+        }
+        return parsedString.toString();
+
+    }
+
+    /**
+     * Parses the all children.
+     *
+     * @param root the root
+     * @param parentOpt the parent opt
+     * @param opt the opt
+     * @param min the min
+     * @param wait the wait
+     * @return the string
+     */
+    @SuppressWarnings("unchecked")
+    private String parseAllChildren(Element root, String parentOpt, String 
opt, String min, String wait) {
+        StringBuffer parsedString = new StringBuffer();
+
+        List<Element> childrens = root.getChildren();
+        for (int i = 0; i < childrens.size(); i++) {
+            String currentMin = min;
+            String currentWait = wait;
+            String childMin = childrens.get(i).getAttributeValue("min");
+            String childWait = childrens.get(i).getAttributeValue("wait");
+            if (!StringUtils.isEmpty(childMin)) {
+                currentMin = childMin;
+            }
+            if (!StringUtils.isEmpty(childWait)) {
+                currentWait = childWait;
+            }
+            parsedString.append(parse(childrens.get(i), opt, currentMin, 
currentWait));
+            if (i < childrens.size() - 1) {
+                if (!StringUtils.isEmpty(opt))
+                    parsedString.append(" " + opt + " ");
+            }
+        }
+        return parsedString.toString();
+
+    }
+
+    /**
+     * Parses the data in node.
+     *
+     * @param root the root
+     * @param min the min
+     * @param wait the wait
+     * @return the string
+     */
+    private String parseDataInNode(Element root, String min, String wait) {
+        StringBuffer parsedString = new StringBuffer();
+
+        String nestedChildDataName = root.getAttributeValue("dataset");
+
+        parsedString.append("dependencyBuilder.input(\"" + nestedChildDataName 
+ "\")");
+        appendMin(root, min, parsedString);
+        appendWait(root, wait, parsedString);
+        parsedString.append(".build()");
+        return parsedString.toString();
+    }
+
+    /**
+     * Process child node.
+     *
+     * @param root the root
+     * @param opt the opt
+     * @param min the min
+     * @param wait the wait
+     * @return the string
+     */
+    @SuppressWarnings("unchecked")
+    private String processChildNode(final Element root, final String opt, 
final String min, final String wait) {
+        StringBuffer parsedString = new StringBuffer();
+
+        Namespace ns = root.getNamespace();
+
+        List<Element> childrens = root.getChildren(COORD_INPUT_EVENTS_DATA_IN, 
ns);
+
+        for (int i = 0; i < childrens.size(); i++) {
+            parsedString.append(parseDataInNode(childrens.get(i), min, wait));
+
+            if (i < childrens.size() - 1) {
+                parsedString.append(" " + opt + " ");
+            }
+        }
+        return parsedString.toString();
+    }
+
+    /**
+     * Process combined node.
+     *
+     * @param root the root
+     * @param opt the opt
+     * @param min the min
+     * @param wait the wait
+     * @return the string
+     */
+    @SuppressWarnings("unchecked")
+    private String processCombinedNode(final Element root, final String opt, 
final String min, final String wait) {
+        StringBuffer parsedString = new StringBuffer();
+
+        Namespace ns = root.getNamespace();
+
+        List<Element> childrens = root.getChildren(COORD_INPUT_EVENTS_DATA_IN, 
ns);
+        parsedString.append("dependencyBuilder.combine(");
+
+        for (int i = 0; i < childrens.size(); i++) {
+            String nestedChildDataName = 
childrens.get(i).getAttributeValue("dataset");
+            parsedString.append("\"" + nestedChildDataName + "\"");
+            if (i < childrens.size() - 1) {
+                parsedString.append(",");
+            }
+        }
+        parsedString.append(")");
+
+        appendMin(root, min, parsedString);
+        appendWait(root, wait, parsedString);
+        parsedString.append(".build()");
+        return parsedString.toString();
+
+    }
+
+    /**
+     * Gets the opt.
+     *
+     * @param opt the opt
+     * @return the opt
+     */
+    private String getOpt(String opt) {
+        if (opt.equalsIgnoreCase("or")) {
+            return "||";
+        }
+
+        if (opt.equalsIgnoreCase("and")) {
+            return "&&";
+        }
+
+        return "";
+
+    }
+
+    /**
+     * Gets the min.
+     *
+     * @param root the root
+     * @param parentMin the parent min
+     * @return the min
+     */
+    private String getMin(Element root, String parentMin) {
+        String min = root.getAttributeValue("min");
+        if (StringUtils.isEmpty(min)) {
+            return parentMin;
+        }
+        return min;
+
+    }
+
+    /**
+     * Gets the wait.
+     *
+     * @param root the root
+     * @param parentWait the parent wait
+     * @return the wait
+     */
+    private String getWait(Element root, String parentWait) {
+        String wait = root.getAttributeValue("wait");
+        if (StringUtils.isEmpty(parentWait)) {
+            return parentWait;
+        }
+        return wait;
+
+    }
+
+    private void appendWait(final Element root, String wait, StringBuffer 
parsedString) {
+        String childWait = root.getAttributeValue("wait");
+        if (!StringUtils.isEmpty(childWait)) {
+            parsedString.append(".inputWait(" + childWait + ")");
+
+        }
+        else {
+            if (!StringUtils.isEmpty(wait)) {
+                parsedString.append(".inputWait(" + wait + ")");
+
+            }
+        }
+
+    }
+
+    private void appendMin(final Element root, String min, StringBuffer 
parsedString) {
+        String childMin = root.getAttributeValue("min");
+
+        if (!StringUtils.isEmpty(childMin)) {
+            parsedString.append(".min(" + childMin + ")");
+
+        }
+        else {
+            if (!StringUtils.isEmpty(min)) {
+                parsedString.append(".min(" + min + ")");
+
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java 
b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java
new file mode 100644
index 0000000..66c4f2b
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import org.apache.commons.jexl2.Interpreter;
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.JexlEngine;
+
+/**
+ * Oozie implementation of Jexl Engine
+ *
+ */
+public class OozieJexlEngine extends JexlEngine {
+    OozieJexlInterpreter oozieInterpreter;
+
+    public OozieJexlEngine() {
+    }
+
+    protected Interpreter createInterpreter(JexlContext context, boolean 
strictFlag, boolean silentFlag) {
+        if (oozieInterpreter == null) {
+            oozieInterpreter = new OozieJexlInterpreter(this, context == null 
? EMPTY_CONTEXT : context, true,
+                    silentFlag);
+        }
+        return oozieInterpreter;
+    }
+
+    public OozieJexlInterpreter getOozieInterpreter() {
+        return oozieInterpreter;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java
 
b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java
new file mode 100644
index 0000000..2044723
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import org.apache.commons.jexl2.Interpreter;
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.parser.ASTAndNode;
+import org.apache.commons.jexl2.parser.ASTOrNode;
+import org.apache.commons.jexl2.parser.JexlNode;
+
+/**
+ * Oozie implementation of jexl Interpreter
+ */
+public class OozieJexlInterpreter extends Interpreter {
+
+    protected OozieJexlInterpreter(Interpreter base) {
+        super(base);
+    }
+
+    public Object interpret(JexlNode node) {
+        return node.jjtAccept(this, "");
+    }
+
+    public OozieJexlInterpreter(JexlEngine jexlEngine, JexlContext 
jexlContext, boolean strictFlag, boolean silentFlag) {
+        super(jexlEngine, jexlContext, strictFlag, silentFlag);
+    }
+
+    public Object visit(ASTOrNode node, Object data) {
+        CoordInputLogicEvaluatorResult left = (CoordInputLogicEvaluatorResult) 
node.jjtGetChild(0)
+                .jjtAccept(this, data);
+
+        if (left.isTrue()) {
+            return left;
+        }
+
+        return node.jjtGetChild(1).jjtAccept(this, data);
+    }
+
+    /** {@inheritDoc} */
+    public Object visit(ASTAndNode node, Object data) {
+        CoordInputLogicEvaluatorResult left = (CoordInputLogicEvaluatorResult) 
node.jjtGetChild(0)
+                .jjtAccept(this, data);
+
+        if (!left.isTrue()) {
+            return left;
+        }
+        CoordInputLogicEvaluatorResult right = 
(CoordInputLogicEvaluatorResult) node.jjtGetChild(1).jjtAccept(this,
+                data);
+        if (right.isTrue()) {
+            right.appendDataSets(left.getDataSets());
+        }
+
+        return right;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java 
b/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
index c280d1d..fe7a327 100644
--- a/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
+++ b/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
@@ -25,7 +25,7 @@ public class ActionDependency {
     private List<String> missingDependencies;
     private List<String> availableDependencies;
 
-    ActionDependency(List<String> missingDependencies, List<String> 
availableDependencies) {
+    public ActionDependency(List<String> missingDependencies, List<String> 
availableDependencies) {
         this.missingDependencies = missingDependencies;
         this.availableDependencies = availableDependencies;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java 
b/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
index a550757..bdd854f 100644
--- a/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
+++ b/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
@@ -21,7 +21,9 @@ package org.apache.oozie.dependency;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
@@ -53,6 +55,9 @@ public class DependencyChecker {
      * @return missing dependencies as a array
      */
     public static String[] dependenciesAsArray(String missingDependencies) {
+        if(StringUtils.isEmpty(missingDependencies)){
+            return new String[0];
+        }
         return missingDependencies.split(CoordELFunctions.INSTANCE_SEPARATOR);
     }
 
@@ -69,7 +74,7 @@ public class DependencyChecker {
      */
     public static ActionDependency checkForAvailability(String 
missingDependencies, Configuration actionConf,
             boolean stopOnFirstMissing) throws CommandException {
-        return checkForAvailability(dependenciesAsArray(missingDependencies), 
actionConf, stopOnFirstMissing);
+        return 
checkForAvailability(Arrays.asList(dependenciesAsArray(missingDependencies)), 
actionConf, stopOnFirstMissing);
     }
 
     /**
@@ -83,7 +88,7 @@ public class DependencyChecker {
      * @return ActionDependency which has the list of missing and available 
dependencies
      * @throws CommandException
      */
-    public static ActionDependency checkForAvailability(String[] 
missingDependencies, Configuration actionConf,
+    public static ActionDependency checkForAvailability(List<String> 
missingDependencies, Configuration actionConf,
             boolean stopOnFirstMissing) throws CommandException {
         final XLog LOG = XLog.getLog(DependencyChecker.class); //OOZIE-1251. 
Don't initialize as static variable.
         String user = 
ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), 
OozieClient.USER_NAME);
@@ -92,9 +97,9 @@ public class DependencyChecker {
         URIHandlerService uriService = 
Services.get().get(URIHandlerService.class);
         boolean continueChecking = true;
         try {
-            for (int index = 0; index < missingDependencies.length; index++) {
+            for (int index = 0; index < missingDependencies.size(); index++) {
                 if (continueChecking) {
-                    String dependency = missingDependencies[index];
+                    String dependency = missingDependencies.get(index);
 
                     URI uri = new URI(dependency);
                     URIHandler uriHandler = uriService.getURIHandler(uri);
@@ -113,7 +118,7 @@ public class DependencyChecker {
 
                 }
                 else {
-                    missingDeps.add(missingDependencies[index]);
+                    missingDeps.add(missingDependencies.get(index));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
index 7c1aadf..65d85b8 100644
--- a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
@@ -114,6 +114,15 @@ public class FSURIHandler implements URIHandler {
     }
 
     @Override
+    public String getURIWithoutDoneFlag(String uri, String doneFlag) throws 
URIHandlerException {
+        if (doneFlag.length() > 0 && uri.endsWith(doneFlag)) {
+            return uri.substring(0, uri.lastIndexOf("/" + doneFlag));
+        }
+        return uri;
+    }
+
+
+    @Override
     public void validate(String uri) throws URIHandlerException {
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
index 1bbf37d..67b37ec 100644
--- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
@@ -211,6 +211,11 @@ public class HCatURIHandler implements URIHandler {
     }
 
     @Override
+    public String getURIWithoutDoneFlag(String uri, String doneFlag) throws 
URIHandlerException {
+        return uri;
+    }
+
+    @Override
     public void validate(String uri) throws URIHandlerException {
         try {
             new HCatURI(uri); // will fail if uri syntax is incorrect

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java 
b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
index bc94716..45e23fb 100644
--- a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
+++ b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
@@ -169,6 +169,19 @@ public interface URIHandler {
     public String getURIWithDoneFlag(String uri, String doneFlag) throws 
URIHandlerException;
 
     /**
+     * Get the URI path from path which has done flag
+     *
+     * @param uri URI of the dependency
+     * @param doneFlag flag that determines URI availability
+     *
+     * @return the final URI without the doneFlag incorporated
+     *
+     * @throws URIHandlerException
+     */
+    public String getURIWithoutDoneFlag(String uri, String doneFlag) throws 
URIHandlerException;
+
+
+    /**
      * Check whether the URI is valid or not
      * @param uri
      * @throws URIHandlerException
@@ -220,4 +233,5 @@ public interface URIHandler {
 
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/util/WritableUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/WritableUtils.java 
b/core/src/main/java/org/apache/oozie/util/WritableUtils.java
index 76a6895..aa027e3 100644
--- a/core/src/main/java/org/apache/oozie/util/WritableUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/WritableUtils.java
@@ -20,6 +20,7 @@ package org.apache.oozie.util;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.compression.CodecFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -28,12 +29,19 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.DataOutput;
 import java.io.DataInput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Utility class to write/read Hadoop writables to/from a byte array.
  */
 public class WritableUtils {
 
+    public static XLog LOG = XLog.getLog(WritableUtils.class);
+
     /**
      * Write a writable to a byte array.
      *
@@ -60,7 +68,6 @@ public class WritableUtils {
      * @param clazz writable class.
      * @return writable deserialized from the byte array.
      */
-    @SuppressWarnings("unchecked")
     public static <T extends Writable> T fromByteArray(byte[] array, Class<T> 
clazz) {
         try {
             T o = (T) ReflectionUtils.newInstance(clazz, null);
@@ -99,4 +106,143 @@ public class WritableUtils {
         String str = dataInput.readUTF();
         return (str.equals(NULL)) ? null : str;
     }
+
+    /**
+     * Read list.
+     *
+     * @param <T> the generic type
+     * @param dataInput the data input
+     * @param clazz the clazz
+     * @return the list
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static <T extends Writable> List<T> readList(DataInput dataInput, 
Class<T> clazz) throws IOException {
+        List<T> a = new ArrayList<T>();
+        int count = dataInput.readInt();
+        for (int i = 0; i < count; i++) {
+            T o = (T) ReflectionUtils.newInstance(clazz, null);
+            o.readFields(dataInput);
+            a.add(o);
+        }
+        return a;
+    }
+
+    public static List<String> readStringList(DataInput dataInput) throws 
IOException {
+        List<String> a = new ArrayList<String>();
+        int count = dataInput.readInt();
+        for (int i = 0; i < count; i++) {
+            a.add(readBytesAsString(dataInput));
+        }
+        return a;
+    }
+
+    /**
+     * Write list.
+     *
+     * @param <T> the generic type
+     * @param dataOutput the data output
+     * @param list the list
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static <T extends Writable> void writeList(DataOutput dataOutput, 
List<T> list) throws IOException {
+        dataOutput.writeInt(list.size());
+        for (T t : list) {
+            t.write(dataOutput);
+        }
+    }
+
+    public static void writeStringList(DataOutput dataOutput, List<String> 
list) throws IOException {
+        dataOutput.writeInt(list.size());
+        for (String str : list) {
+            writeStringAsBytes(dataOutput, str);
+        }
+    }
+
+    /**
+     * Write map.
+     *
+     * @param <T> the generic type
+     * @param dataOutput the data output
+     * @param map the map
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static <T extends Writable> void writeMap(DataOutput dataOutput, 
Map<String, T> map) throws IOException {
+        dataOutput.writeInt(map.size());
+        for (Entry<String, T> t : map.entrySet()) {
+            writeStringAsBytes(dataOutput, t.getKey());
+            t.getValue().write(dataOutput);
+        }
+    }
+
+    /**
+     * Write map with list.
+     *
+     * @param <T> the generic type
+     * @param dataOutput the data output
+     * @param map the map
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static <T extends Writable> void writeMapWithList(DataOutput 
dataOutput, Map<String, List<T>> map)
+            throws IOException {
+        dataOutput.writeInt(map.size());
+        for (Entry<String, List<T>> t : map.entrySet()) {
+            writeStringAsBytes(dataOutput, t.getKey());
+            writeList(dataOutput, t.getValue());
+        }
+    }
+
+    /**
+     * Read map.
+     *
+     * @param <T> the generic type
+     * @param dataInput the data input
+     * @param clazz the clazz
+     * @return the map
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static <T extends Writable> Map<String, T> readMap(DataInput 
dataInput, Class<T> clazz) throws IOException {
+        Map<String, T> map = new HashMap<String, T>();
+        int count = dataInput.readInt();
+        for (int i = 0; i < count; i++) {
+            String key = readBytesAsString(dataInput);
+            T value = (T) ReflectionUtils.newInstance(clazz, null);
+            value.readFields(dataInput);
+            map.put(key, value);
+        }
+        return map;
+    }
+
+    /**
+     * Read map with list.
+     *
+     * @param <T> the generic type
+     * @param dataInput the data input
+     * @param clazz the clazz
+     * @return the map
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static <T extends Writable> Map<String, List<T>> 
readMapWithList(DataInput dataInput, Class<T> clazz)
+            throws IOException {
+        Map<String, List<T>> map = new HashMap<String, List<T>>();
+        int count = dataInput.readInt();
+        for (int i = 0; i < count; i++) {
+            String key = readBytesAsString(dataInput);
+            map.put(key, readList(dataInput, clazz));
+        }
+        return map;
+    }
+
+    public static void writeStringAsBytes(DataOutput dOut, String value) 
throws IOException {
+        byte[] data = value.getBytes(CodecFactory.UTF_8_ENCODING);
+        dOut.writeInt(data.length);
+        dOut.write(data);
+    }
+
+    public static String readBytesAsString(DataInput dIn) throws IOException {
+        int length = dIn.readInt();
+        byte[] data = new byte[length];
+        dIn.readFully(data);
+        return new String(data, CodecFactory.UTF_8_ENCODING);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index ca49fa6..3ff7320 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1516,7 +1516,7 @@
         <name>oozie.service.SchemaService.coord.schemas</name>
         <value>
             
oozie-coordinator-0.1.xsd,oozie-coordinator-0.2.xsd,oozie-coordinator-0.3.xsd,oozie-coordinator-0.4.xsd,
-            oozie-sla-0.1.xsd,oozie-sla-0.2.xsd
+            oozie-coordinator-0.5.xsd,oozie-sla-0.1.xsd,oozie-sla-0.2.xsd
         </value>
         <description>
             List of schemas for coordinators (separated by commas).

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
index 1fe1b3a..c27a40a 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
@@ -19,7 +19,6 @@
 package org.apache.oozie.command.coord;
 
 import java.io.IOException;
-import java.io.Reader;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
@@ -34,6 +33,7 @@ import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.input.dependency.CoordOldInputDependency;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
@@ -553,13 +553,14 @@ public class TestCoordActionInputCheckXCommand extends 
XDataTestCase {
         Path appPath = new Path(getFsTestCaseDir(), "coord");
         String inputDir = appPath.toString() + "/coord-input/2010/07/09/01/00";
         String nonExistDir = inputDir.replaceFirst("localhost", "nonExist");
+        CoordinatorActionBean actionBean = new CoordinatorActionBean();
         try {
-            caicc.pathExists(nonExistDir, new XConfiguration(), getTestUser());
+            new CoordOldInputDependency().pathExists(actionBean, nonExistDir, 
new XConfiguration(), getTestUser());
             fail("Should throw exception due to non-existent NN path. 
Therefore fail");
         }
         catch (IOException ioe) {
-            assertEquals(caicc.getCoordActionErrorCode(), "E0901");
-            assertTrue(caicc.getCoordActionErrorMsg().contains("not in Oozie's 
whitelist"));
+            assertEquals(actionBean.getErrorCode(), "E0901");
+            assertTrue(actionBean.getErrorMessage().contains("not in Oozie's 
whitelist"));
         }
     }
 

Reply via email to