http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java new file mode 100644 index 0000000..31cf081 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseThree.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency; +import org.apache.oozie.coord.input.dependency.CoordInputInstance; +import org.apache.oozie.dependency.URIHandler; +import org.apache.oozie.dependency.URIHandlerException; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; +import org.apache.oozie.util.ELEvaluator; + +public class CoordInputLogicEvaluatorPhaseThree extends CoordInputLogicEvaluatorPhaseOne { + + ELEvaluator eval; + + public CoordInputLogicEvaluatorPhaseThree(CoordinatorActionBean coordAction, ELEvaluator eval) { + super(coordAction, (AbstractCoordInputDependency) coordAction.getPullInputDependencies()); + this.eval = eval; + } + + public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { + return getResultFromPullPush(coordAction, dataSet, min); + + } + + public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { + return combine(coordInputDependency, inputSets, min, wait); + } + + public CoordInputLogicEvaluatorResult combine(AbstractCoordInputDependency coordInputDependency, + String[] inputSets, int min, int wait) { + + List<String> availableList = new ArrayList<String>(); + + if (coordInputDependency.getDependencyMap().get(inputSets[0]) == null) { + return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); + } + + try { + String firstInputSet = inputSets[0]; + List<CoordInputInstance> firstInputSetList = coordInputDependency.getDependencyMap().get(firstInputSet); + for (int i = 0; i < firstInputSetList.size(); i++) { + CoordInputInstance coordInputInstance = firstInputSetList.get(i); + if (!coordInputInstance.isAvailable()) { + for (int j = 1; j < inputSets.length; j++) { + if (coordInputDependency.getDependencyMap().get(inputSets[j]).get(i).isAvailable()) { + availableList.add(getPathWithoutDoneFlag( + coordInputDependency.getDependencyMap().get(inputSets[j]).get(i) + .getInputDataInstance(), inputSets[j])); + } + } + } + + else { + availableList.add(getPathWithoutDoneFlag(coordInputInstance.getInputDataInstance(), firstInputSet)); + } + } + } + catch (Exception e) { + log.error(e); + throw new RuntimeException(ErrorCode.E1028.format("Error executing combine function " + e.getMessage())); + } + boolean allFound = availableList.size() == coordInputDependency.getDependencyMap().get(inputSets[0]).size(); + return getEvalResult(allFound, min, wait, availableList); + } + + protected boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException, + URIHandlerException { + return false; + } + + public boolean isInputWaitElapsed(int timeInMin) { + return true; + } + + public String getListAsString(List<String> input, String dataSet) { + if (input == null || input.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + try { + + for (int i = 1; i < input.size(); i++) { + sb.append(getPathWithoutDoneFlag(input.get(i - 1), dataSet)).append(","); + } + sb.append(getPathWithoutDoneFlag(input.get(input.size() - 1), dataSet)); + } + catch (URIHandlerException e) { + log.error(e); + throw new RuntimeException(ErrorCode.E1028.format("Error finding path without done flag " + e.getMessage())); + } + + return sb.toString(); + } + + private String getPathWithoutDoneFlag(String sPath, String dataSet) throws URIHandlerException { + if (dataSet == null) { + return sPath; + } + URIHandlerService service = Services.get().get(URIHandlerService.class); + URIHandler handler = service.getURIHandler(sPath); + return handler.getURIWithoutDoneFlag(sPath, eval.getVariable(".datain." + dataSet + ".doneFlag").toString()); + } + +}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java new file mode 100644 index 0000000..16fc400 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseTwo.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.coord.CoordELConstants; +import org.apache.oozie.coord.CoordELEvaluator; +import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.coord.input.dependency.AbstractCoordInputDependency; +import org.apache.oozie.coord.input.dependency.CoordPullInputDependency; +import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorResult.STATUS; +import org.apache.oozie.dependency.DependencyChecker; +import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; +import org.jdom.JDOMException; + +public class CoordInputLogicEvaluatorPhaseTwo extends CoordInputLogicEvaluatorPhaseOne { + + Date actualTime; + + public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction, Date actualTime) { + super(coordAction); + this.actualTime = actualTime; + } + + public CoordInputLogicEvaluatorPhaseTwo(CoordinatorActionBean coordAction, + AbstractCoordInputDependency coordInputDependency) { + super(coordAction, coordInputDependency); + } + + @Override + public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { + try { + CoordPullInputDependency coordPullInputDependency = (CoordPullInputDependency) coordInputDependency; + ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, coordAction.getNominalTime(), + getInputSetEvent(dataSet), getConf()); + if (coordPullInputDependency.getUnResolvedDependency(dataSet) == null) { + return super.evalInput(dataSet, min, wait); + + } + else { + cleanPreviousCheckData(coordPullInputDependency, dataSet); + List<String> unresolvedList = coordPullInputDependency.getUnResolvedDependency(dataSet) + .getDependencies(); + for (String unresolved : unresolvedList) { + String resolvedPath = ""; + + CoordELFunctions.evalAndWrap(eval, unresolved); + boolean isResolved = (Boolean) eval.getVariable(CoordELConstants.IS_RESOLVED); + + coordPullInputDependency.setDependencyMap(dependencyMap); + if (eval.getVariable(CoordELConstants.RESOLVED_PATH) != null) { + resolvedPath = eval.getVariable(CoordELConstants.RESOLVED_PATH).toString(); + } + if (resolvedPath != null) { + resolvedPath = getEvalResult(isResolved, min, wait, + Arrays.asList(DependencyChecker.dependenciesAsArray(resolvedPath.toString()))) + .getDataSets(); + + } + + log.trace(MessageFormat.format("Return data is {0}", resolvedPath)); + log.debug(MessageFormat.format("Resolved status of Data set {0} with min {1} and wait {2} = {3}", + dataSet, min, wait, !StringUtils.isEmpty(resolvedPath))); + + if ((isInputWaitElapsed(wait) || isResolved) && !StringUtils.isEmpty(resolvedPath)) { + coordPullInputDependency.addResolvedList(dataSet, resolvedPath.toString()); + } + else { + cleanPreviousCheckData(coordPullInputDependency, dataSet); + if (!isInputWaitElapsed(wait)) { + return new CoordInputLogicEvaluatorResult( + CoordInputLogicEvaluatorResult.STATUS.TIMED_WAITING); + } + else { + return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); + } + } + } + coordPullInputDependency.getUnResolvedDependency(dataSet).setResolved(true); + return new CoordInputLogicEvaluatorResult(STATUS.TRUE, getListAsString(coordPullInputDependency + .getUnResolvedDependency(dataSet).getResolvedList(), dataSet)); + + } + } + catch (Exception e) { + throw new RuntimeException(" event not found" + e, e); + + } + + } + + private void cleanPreviousCheckData(CoordPullInputDependency coordPullInputDependency, String dataSet) { + // Previous check might have resolved and added resolved list. Cleanup any resolved list stored by previous + // check. + if (coordPullInputDependency.getUnResolvedDependency(dataSet) != null) { + coordPullInputDependency.getUnResolvedDependency(dataSet).setResolvedList(new ArrayList<String>()); + } + + } + + @Override + public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { + throw new RuntimeException("Combine is not supported for latest/future"); + + } + + @SuppressWarnings("unchecked") + private Element getInputSetEvent(String name) throws JDOMException { + Element eAction = XmlUtils.parseXml(coordAction.getActionXml().toString()); + Element inputList = eAction.getChild("input-events", eAction.getNamespace()); + List<Element> eDataEvents = inputList.getChildren("data-in", eAction.getNamespace()); + for (Element dEvent : eDataEvents) { + if (dEvent.getAttribute("name").getValue().equals(name)) { + return dEvent; + } + } + throw new RuntimeException("Event not found"); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java new file mode 100644 index 0000000..f485296 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorPhaseValidate.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.coord.input.dependency.CoordInputInstance; +import org.apache.oozie.coord.input.dependency.CoordPullInputDependency; +import org.apache.oozie.coord.input.dependency.CoordPushInputDependency; + +public class CoordInputLogicEvaluatorPhaseValidate implements CoordInputLogicEvaluator { + + CoordPullInputDependency coordPullInputDependency; + CoordPushInputDependency coordPushInputDependency; + + protected Map<String, List<CoordInputInstance>> dependencyMap; + protected CoordinatorActionBean coordAction = null; + protected CoordinatorJob coordJob = null; + + public CoordInputLogicEvaluatorPhaseValidate(CoordinatorActionBean coordAction) { + this.coordAction = coordAction; + coordPullInputDependency = (CoordPullInputDependency) coordAction.getPullInputDependencies(); + coordPushInputDependency = (CoordPushInputDependency) coordAction.getPushInputDependencies(); + + } + + @Override + public CoordInputLogicEvaluatorResult evalInput(String dataSet, int min, int wait) { + getDataSetLen(dataSet); + return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); + } + + @Override + public CoordInputLogicEvaluatorResult evalCombineInput(String[] inputSets, int min, int wait) { + if (inputSets.length <= 1) { + throw new RuntimeException("Combine should have at least two input sets. DataSets : " + + Arrays.toString(inputSets)); + } + int firstInputSetLen = getDataSetLen(inputSets[0]); + for (int i = 1; i < inputSets.length; i++) { + if (getDataSetLen(inputSets[i]) != firstInputSetLen) { + throw new RuntimeException("Combine should have same range. DataSets : " + Arrays.toString(inputSets)); + } + if (coordPullInputDependency.getUnResolvedDependency(inputSets[i]) != null) { + throw new RuntimeException("Combine is not supported for latest/future"); + } + } + return new CoordInputLogicEvaluatorResult(CoordInputLogicEvaluatorResult.STATUS.FALSE); + } + + private int getDataSetLen(String dataset) { + if (coordAction.getPullInputDependencies() != null) { + if (coordPullInputDependency.getDependencyMap().get(dataset) != null) { + return coordPullInputDependency.getDependencyMap().get(dataset).size(); + } + + if (coordPullInputDependency.getUnResolvedDependency(dataset) != null) { + return 1; + } + + } + if (coordAction.getPushInputDependencies() != null) { + if (coordPushInputDependency.getDependencyMap().get(dataset) != null) { + return coordPushInputDependency.getDependencyMap().get(dataset).size(); + } + } + throw new RuntimeException(" Data set not found : " + dataset); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java new file mode 100644 index 0000000..2f3f034 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorResult.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import org.apache.commons.lang.StringUtils; +import org.apache.oozie.coord.CoordELFunctions; + +public class CoordInputLogicEvaluatorResult { + + private STATUS status; + private String dataSets; + + public static enum STATUS { + TRUE, FALSE, PHASE_TWO_EVALUATION, TIMED_WAITING + } + + public CoordInputLogicEvaluatorResult() { + } + + public CoordInputLogicEvaluatorResult(STATUS status, String dataSets) { + this.status = status; + this.dataSets = dataSets; + } + + public CoordInputLogicEvaluatorResult(STATUS status) { + this.status = status; + } + + public String getDataSets() { + return dataSets; + } + + public void setDataSets(String dataSets) { + this.dataSets = dataSets; + } + + public void appendDataSets(String inputDataSets) { + if (StringUtils.isEmpty(inputDataSets)) { + return; + } + if (StringUtils.isEmpty(this.dataSets)) { + this.dataSets = inputDataSets; + } + else { + this.dataSets = this.dataSets + CoordELFunctions.DIR_SEPARATOR + inputDataSets; + } + } + + public void setStatus(STATUS status) { + this.status = status; + } + + public STATUS getStatus() { + return status; + } + + public boolean isTrue() { + if (status == null) { + return false; + } + switch (status) { + case TIMED_WAITING: + case PHASE_TWO_EVALUATION: + case TRUE: + return true; + default: + return false; + } + + } + + public boolean isWaiting() { + if (status == null) { + return false; + } + return status.equals(STATUS.TIMED_WAITING); + + } + + public boolean isPhaseTwoEvaluation() { + if (status == null) { + return false; + } + return status.equals(STATUS.PHASE_TWO_EVALUATION); + + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java new file mode 100644 index 0000000..63c0760 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/CoordInputLogicEvaluatorUtil.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.util.Date; + +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.NamespaceResolver; +import org.apache.commons.lang.StringUtils; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.coord.CoordUtils; +import org.apache.oozie.coord.SyncCoordAction; +import org.apache.oozie.coord.input.dependency.CoordPullInputDependency; +import org.apache.oozie.coord.input.dependency.CoordPushInputDependency; +import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.LogUtils; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; +import org.jdom.JDOMException; + +public class CoordInputLogicEvaluatorUtil { + + private CoordinatorActionBean coordAction = null; + private XLog log = XLog.getLog(getClass()); + + public CoordInputLogicEvaluatorUtil(CoordinatorActionBean coordAction) { + this.coordAction = coordAction; + LogUtils.setLogInfo(coordAction); + + } + + public CoordInputLogicEvaluatorUtil() { + } + + /** + * Check pull missing dependencies. + * + * @return true, if successful + * @throws JDOMException the JDOM exception + */ + public boolean checkPullMissingDependencies() throws JDOMException { + JexlEngine jexl = new OozieJexlEngine(); + + String expression = CoordUtils.getInputLogic(coordAction.getActionXml().toString()); + if (StringUtils.isEmpty(expression)) { + return true; + } + Expression e = jexl.createExpression(expression); + + JexlContext jc = new OozieJexlParser(jexl, new CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseOne( + coordAction, coordAction.getPullInputDependencies()))); + CoordInputLogicEvaluatorResult result = (CoordInputLogicEvaluatorResult) e.evaluate(jc); + log.debug("Input logic expression for [{0}] and evaluate result is [{1}]", expression, result.isTrue()); + + if (result.isWaiting()) { + return false; + } + return result.isTrue(); + } + + /** + * Validate input logic. + * + * @throws JDOMException the JDOM exception + * @throws CommandException + */ + public void validateInputLogic() throws JDOMException, CommandException { + JexlEngine jexl = new OozieJexlEngine(); + String expression = CoordUtils.getInputLogic(coordAction.getActionXml().toString()); + if (StringUtils.isEmpty(expression)) { + return; + } + Expression e = jexl.createExpression(expression); + JexlContext jc = new OozieJexlParser(jexl, new CoordInputLogicBuilder( + new CoordInputLogicEvaluatorPhaseValidate(coordAction))); + try { + Object result = e.evaluate(jc); + log.debug("Input logic expression is [{0}] and evaluate result is [{1}]", expression, result); + + } + catch (RuntimeException re) { + throw new CommandException(ErrorCode.E1028, re.getCause().getMessage()); + } + + } + + /** + * Get input dependencies. + * + * @param name the name + * @param syncCoordAction the sync coord action + * @return the string + * @throws JDOMException the JDOM exception + */ + public String getInputDependencies(String name, SyncCoordAction syncCoordAction) throws JDOMException { + JexlEngine jexl = new OozieJexlEngine(); + + CoordinatorActionBean coordAction = new CoordinatorActionBean(); + ELEvaluator eval = ELEvaluator.getCurrent(); + coordAction.setId(syncCoordAction.getActionId()); + Element eJob = XmlUtils.parseXml(eval.getVariable(".actionInputLogic").toString()); + String expression = new InputLogicParser().parseWithName(eJob, name); + + Expression e = jexl.createExpression(expression); + + CoordPullInputDependency pull = (CoordPullInputDependency) syncCoordAction.getPullDependencies(); + CoordPushInputDependency push = (CoordPushInputDependency) syncCoordAction.getPushDependencies(); + + coordAction.setPushInputDependencies(push); + + coordAction.setPullInputDependencies(pull); + + JexlContext jc = new OozieJexlParser(jexl, new CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseThree( + coordAction, eval))); + CoordInputLogicEvaluatorResult result = (CoordInputLogicEvaluatorResult) e.evaluate(jc); + log.debug("Input logic expression for [{0}] is [{1}] and evaluate result is [{2}]", name, expression, + result.isTrue()); + + if (!result.isTrue()) { + return name + " is not resolved"; + } + return result.getDataSets(); + + } + + /** + * Check push dependencies. + * + * @return true, if successful + * @throws JDOMException the JDOM exception + */ + public boolean checkPushDependencies() throws JDOMException { + JexlEngine jexl = new OozieJexlEngine(); + + String expression = CoordUtils.getInputLogic(coordAction.getActionXml().toString()); + if (StringUtils.isEmpty(expression)) { + return true; + } + + Expression e = jexl.createExpression(expression); + JexlContext jc = new OozieJexlParser(jexl, new CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseOne( + coordAction, coordAction.getPushInputDependencies()))); + CoordInputLogicEvaluatorResult result = (CoordInputLogicEvaluatorResult) e.evaluate(jc); + log.debug("Input logic expression for [{0}] and evaluate result is [{1}]", expression, result.isTrue()); + + if (result.isWaiting()) { + return false; + } + return result.isTrue(); + } + + /** + * Check unresolved. + * + * @param actualTime the actual time + * @return true, if successful + * @throws JDOMException the JDOM exception + */ + public boolean checkUnResolved(Date actualTime) throws JDOMException { + JexlEngine jexl = new OozieJexlEngine(); + + String expression = CoordUtils.getInputLogic(coordAction.getActionXml().toString()); + if (StringUtils.isEmpty(expression)) { + return true; + } + + Expression e = jexl.createExpression(expression); + JexlContext jc = new OozieJexlParser(jexl, new CoordInputLogicBuilder(new CoordInputLogicEvaluatorPhaseTwo( + coordAction, actualTime))); + CoordInputLogicEvaluatorResult result = (CoordInputLogicEvaluatorResult) e.evaluate(jc); + log.debug("Input logic expression for [{0}] and evaluate result is [{1}]", expression, result.isTrue()); + + if (result.isWaiting()) { + return false; + } + return result.isTrue(); + + } + + public class OozieJexlParser implements JexlContext, NamespaceResolver { + private final JexlEngine jexl; + private final CoordInputLogicBuilder object; + + @Override + public Object resolveNamespace(String name) { + return object; + } + + public OozieJexlParser(JexlEngine engine, CoordInputLogicBuilder wrapped) { + this.jexl = engine; + this.object = wrapped; + } + + public Object get(String name) { + return jexl.getProperty(object, name); + } + + public void set(String name, Object value) { + jexl.setProperty(object, name, value); + } + + public boolean has(String name) { + return jexl.getUberspect().getPropertyGet(object, name, null) != null; + } + + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java b/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java new file mode 100644 index 0000000..f1f6b41 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/InputLogicParser.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.jdom.Element; +import org.jdom.Namespace; + +/** + * Parses xml into jexl expression + */ +public class InputLogicParser { + + public final static String COORD_INPUT_EVENTS_DATA_IN = "data-in"; + + public final static String AND = "and"; + + public final static String OR = "or"; + + public final static String COMBINE = "combine"; + + /** + * Parses the xml. + * + * @param root the root + * @return the string + */ + public String parse(Element root) { + return parseWithName(root, null); + + } + + /** + * Parses the xml with name. + * + * @param root the root + * @param name the name + * @return the string + */ + @SuppressWarnings("unchecked") + public String parseWithName(Element root, String name) { + if (root == null) { + return ""; + } + StringBuffer parsedString = new StringBuffer(); + + List<Element> childrens = root.getChildren(); + for (int i = 0; i < childrens.size(); i++) { + String childName = childrens.get(i).getAttributeValue("name"); + String min = childrens.get(i).getAttributeValue("min"); + String wait = childrens.get(i).getAttributeValue("wait"); + + if (name == null || name.equals(childName)) { + parsedString.append(parse(childrens.get(i), getOpt(childrens.get(i).getName()), min, wait)); + } + else { + parsedString.append(parseWithName(childrens.get(i), name)); + } + } + return parsedString.toString(); + } + + public String parse(Element root, String opt, String min, String wait) { + StringBuffer parsedString = new StringBuffer(); + + Namespace ns = root.getNamespace(); + if (root.getName().equals(COMBINE)) { + parsedString.append("("); + parsedString.append(processCombinedNode(root, getOpt(root.getName()), getMin(root, min), + getWait(root, wait))); + parsedString.append(")"); + } + else if (root.getName().equals(AND) || root.getName().equals(OR)) { + parsedString.append("("); + parsedString.append(parseAllChildren(root, opt, getOpt(root.getName()), getMin(root, min), + getWait(root, wait))); + parsedString.append(")"); + + } + else if (root.getChild(COORD_INPUT_EVENTS_DATA_IN, ns) != null) { + parsedString.append("("); + parsedString.append(processChildNode(root, getOpt(root.getName()), getMin(root, min), getWait(root, wait))); + parsedString.append(")"); + } + else if (root.getName().equals(COORD_INPUT_EVENTS_DATA_IN)) { + parsedString.append(parseDataInNode(root, min, wait)); + + } + return parsedString.toString(); + + } + + /** + * Parses the all children. + * + * @param root the root + * @param parentOpt the parent opt + * @param opt the opt + * @param min the min + * @param wait the wait + * @return the string + */ + @SuppressWarnings("unchecked") + private String parseAllChildren(Element root, String parentOpt, String opt, String min, String wait) { + StringBuffer parsedString = new StringBuffer(); + + List<Element> childrens = root.getChildren(); + for (int i = 0; i < childrens.size(); i++) { + String currentMin = min; + String currentWait = wait; + String childMin = childrens.get(i).getAttributeValue("min"); + String childWait = childrens.get(i).getAttributeValue("wait"); + if (!StringUtils.isEmpty(childMin)) { + currentMin = childMin; + } + if (!StringUtils.isEmpty(childWait)) { + currentWait = childWait; + } + parsedString.append(parse(childrens.get(i), opt, currentMin, currentWait)); + if (i < childrens.size() - 1) { + if (!StringUtils.isEmpty(opt)) + parsedString.append(" " + opt + " "); + } + } + return parsedString.toString(); + + } + + /** + * Parses the data in node. + * + * @param root the root + * @param min the min + * @param wait the wait + * @return the string + */ + private String parseDataInNode(Element root, String min, String wait) { + StringBuffer parsedString = new StringBuffer(); + + String nestedChildDataName = root.getAttributeValue("dataset"); + + parsedString.append("dependencyBuilder.input(\"" + nestedChildDataName + "\")"); + appendMin(root, min, parsedString); + appendWait(root, wait, parsedString); + parsedString.append(".build()"); + return parsedString.toString(); + } + + /** + * Process child node. + * + * @param root the root + * @param opt the opt + * @param min the min + * @param wait the wait + * @return the string + */ + @SuppressWarnings("unchecked") + private String processChildNode(final Element root, final String opt, final String min, final String wait) { + StringBuffer parsedString = new StringBuffer(); + + Namespace ns = root.getNamespace(); + + List<Element> childrens = root.getChildren(COORD_INPUT_EVENTS_DATA_IN, ns); + + for (int i = 0; i < childrens.size(); i++) { + parsedString.append(parseDataInNode(childrens.get(i), min, wait)); + + if (i < childrens.size() - 1) { + parsedString.append(" " + opt + " "); + } + } + return parsedString.toString(); + } + + /** + * Process combined node. + * + * @param root the root + * @param opt the opt + * @param min the min + * @param wait the wait + * @return the string + */ + @SuppressWarnings("unchecked") + private String processCombinedNode(final Element root, final String opt, final String min, final String wait) { + StringBuffer parsedString = new StringBuffer(); + + Namespace ns = root.getNamespace(); + + List<Element> childrens = root.getChildren(COORD_INPUT_EVENTS_DATA_IN, ns); + parsedString.append("dependencyBuilder.combine("); + + for (int i = 0; i < childrens.size(); i++) { + String nestedChildDataName = childrens.get(i).getAttributeValue("dataset"); + parsedString.append("\"" + nestedChildDataName + "\""); + if (i < childrens.size() - 1) { + parsedString.append(","); + } + } + parsedString.append(")"); + + appendMin(root, min, parsedString); + appendWait(root, wait, parsedString); + parsedString.append(".build()"); + return parsedString.toString(); + + } + + /** + * Gets the opt. + * + * @param opt the opt + * @return the opt + */ + private String getOpt(String opt) { + if (opt.equalsIgnoreCase("or")) { + return "||"; + } + + if (opt.equalsIgnoreCase("and")) { + return "&&"; + } + + return ""; + + } + + /** + * Gets the min. + * + * @param root the root + * @param parentMin the parent min + * @return the min + */ + private String getMin(Element root, String parentMin) { + String min = root.getAttributeValue("min"); + if (StringUtils.isEmpty(min)) { + return parentMin; + } + return min; + + } + + /** + * Gets the wait. + * + * @param root the root + * @param parentWait the parent wait + * @return the wait + */ + private String getWait(Element root, String parentWait) { + String wait = root.getAttributeValue("wait"); + if (StringUtils.isEmpty(parentWait)) { + return parentWait; + } + return wait; + + } + + private void appendWait(final Element root, String wait, StringBuffer parsedString) { + String childWait = root.getAttributeValue("wait"); + if (!StringUtils.isEmpty(childWait)) { + parsedString.append(".inputWait(" + childWait + ")"); + + } + else { + if (!StringUtils.isEmpty(wait)) { + parsedString.append(".inputWait(" + wait + ")"); + + } + } + + } + + private void appendMin(final Element root, String min, StringBuffer parsedString) { + String childMin = root.getAttributeValue("min"); + + if (!StringUtils.isEmpty(childMin)) { + parsedString.append(".min(" + childMin + ")"); + + } + else { + if (!StringUtils.isEmpty(min)) { + parsedString.append(".min(" + min + ")"); + + } + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java new file mode 100644 index 0000000..66c4f2b --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlEngine.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import org.apache.commons.jexl2.Interpreter; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.JexlEngine; + +/** + * Oozie implementation of Jexl Engine + * + */ +public class OozieJexlEngine extends JexlEngine { + OozieJexlInterpreter oozieInterpreter; + + public OozieJexlEngine() { + } + + protected Interpreter createInterpreter(JexlContext context, boolean strictFlag, boolean silentFlag) { + if (oozieInterpreter == null) { + oozieInterpreter = new OozieJexlInterpreter(this, context == null ? EMPTY_CONTEXT : context, true, + silentFlag); + } + return oozieInterpreter; + } + + public OozieJexlInterpreter getOozieInterpreter() { + return oozieInterpreter; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java new file mode 100644 index 0000000..2044723 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/coord/input/logic/OozieJexlInterpreter.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord.input.logic; + +import org.apache.commons.jexl2.Interpreter; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.parser.ASTAndNode; +import org.apache.commons.jexl2.parser.ASTOrNode; +import org.apache.commons.jexl2.parser.JexlNode; + +/** + * Oozie implementation of jexl Interpreter + */ +public class OozieJexlInterpreter extends Interpreter { + + protected OozieJexlInterpreter(Interpreter base) { + super(base); + } + + public Object interpret(JexlNode node) { + return node.jjtAccept(this, ""); + } + + public OozieJexlInterpreter(JexlEngine jexlEngine, JexlContext jexlContext, boolean strictFlag, boolean silentFlag) { + super(jexlEngine, jexlContext, strictFlag, silentFlag); + } + + public Object visit(ASTOrNode node, Object data) { + CoordInputLogicEvaluatorResult left = (CoordInputLogicEvaluatorResult) node.jjtGetChild(0) + .jjtAccept(this, data); + + if (left.isTrue()) { + return left; + } + + return node.jjtGetChild(1).jjtAccept(this, data); + } + + /** {@inheritDoc} */ + public Object visit(ASTAndNode node, Object data) { + CoordInputLogicEvaluatorResult left = (CoordInputLogicEvaluatorResult) node.jjtGetChild(0) + .jjtAccept(this, data); + + if (!left.isTrue()) { + return left; + } + CoordInputLogicEvaluatorResult right = (CoordInputLogicEvaluatorResult) node.jjtGetChild(1).jjtAccept(this, + data); + if (right.isTrue()) { + right.appendDataSets(left.getDataSets()); + } + + return right; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java b/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java index c280d1d..fe7a327 100644 --- a/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java +++ b/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java @@ -25,7 +25,7 @@ public class ActionDependency { private List<String> missingDependencies; private List<String> availableDependencies; - ActionDependency(List<String> missingDependencies, List<String> availableDependencies) { + public ActionDependency(List<String> missingDependencies, List<String> availableDependencies) { this.missingDependencies = missingDependencies; this.availableDependencies = availableDependencies; } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java b/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java index a550757..bdd854f 100644 --- a/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java +++ b/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java @@ -21,7 +21,9 @@ package org.apache.oozie.dependency; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; @@ -53,6 +55,9 @@ public class DependencyChecker { * @return missing dependencies as a array */ public static String[] dependenciesAsArray(String missingDependencies) { + if(StringUtils.isEmpty(missingDependencies)){ + return new String[0]; + } return missingDependencies.split(CoordELFunctions.INSTANCE_SEPARATOR); } @@ -69,7 +74,7 @@ public class DependencyChecker { */ public static ActionDependency checkForAvailability(String missingDependencies, Configuration actionConf, boolean stopOnFirstMissing) throws CommandException { - return checkForAvailability(dependenciesAsArray(missingDependencies), actionConf, stopOnFirstMissing); + return checkForAvailability(Arrays.asList(dependenciesAsArray(missingDependencies)), actionConf, stopOnFirstMissing); } /** @@ -83,7 +88,7 @@ public class DependencyChecker { * @return ActionDependency which has the list of missing and available dependencies * @throws CommandException */ - public static ActionDependency checkForAvailability(String[] missingDependencies, Configuration actionConf, + public static ActionDependency checkForAvailability(List<String> missingDependencies, Configuration actionConf, boolean stopOnFirstMissing) throws CommandException { final XLog LOG = XLog.getLog(DependencyChecker.class); //OOZIE-1251. Don't initialize as static variable. String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); @@ -92,9 +97,9 @@ public class DependencyChecker { URIHandlerService uriService = Services.get().get(URIHandlerService.class); boolean continueChecking = true; try { - for (int index = 0; index < missingDependencies.length; index++) { + for (int index = 0; index < missingDependencies.size(); index++) { if (continueChecking) { - String dependency = missingDependencies[index]; + String dependency = missingDependencies.get(index); URI uri = new URI(dependency); URIHandler uriHandler = uriService.getURIHandler(uri); @@ -113,7 +118,7 @@ public class DependencyChecker { } else { - missingDeps.add(missingDependencies[index]); + missingDeps.add(missingDependencies.get(index)); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java index 7c1aadf..65d85b8 100644 --- a/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java @@ -114,6 +114,15 @@ public class FSURIHandler implements URIHandler { } @Override + public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException { + if (doneFlag.length() > 0 && uri.endsWith(doneFlag)) { + return uri.substring(0, uri.lastIndexOf("/" + doneFlag)); + } + return uri; + } + + + @Override public void validate(String uri) throws URIHandlerException { } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java index 1bbf37d..67b37ec 100644 --- a/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java @@ -211,6 +211,11 @@ public class HCatURIHandler implements URIHandler { } @Override + public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException { + return uri; + } + + @Override public void validate(String uri) throws URIHandlerException { try { new HCatURI(uri); // will fail if uri syntax is incorrect http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/dependency/URIHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java index bc94716..45e23fb 100644 --- a/core/src/main/java/org/apache/oozie/dependency/URIHandler.java +++ b/core/src/main/java/org/apache/oozie/dependency/URIHandler.java @@ -169,6 +169,19 @@ public interface URIHandler { public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException; /** + * Get the URI path from path which has done flag + * + * @param uri URI of the dependency + * @param doneFlag flag that determines URI availability + * + * @return the final URI without the doneFlag incorporated + * + * @throws URIHandlerException + */ + public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException; + + + /** * Check whether the URI is valid or not * @param uri * @throws URIHandlerException @@ -220,4 +233,5 @@ public interface URIHandler { } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/util/WritableUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/WritableUtils.java b/core/src/main/java/org/apache/oozie/util/WritableUtils.java index 76a6895..aa027e3 100644 --- a/core/src/main/java/org/apache/oozie/util/WritableUtils.java +++ b/core/src/main/java/org/apache/oozie/util/WritableUtils.java @@ -20,6 +20,7 @@ package org.apache.oozie.util; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.oozie.compression.CodecFactory; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -28,12 +29,19 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.DataOutput; import java.io.DataInput; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; /** * Utility class to write/read Hadoop writables to/from a byte array. */ public class WritableUtils { + public static XLog LOG = XLog.getLog(WritableUtils.class); + /** * Write a writable to a byte array. * @@ -60,7 +68,6 @@ public class WritableUtils { * @param clazz writable class. * @return writable deserialized from the byte array. */ - @SuppressWarnings("unchecked") public static <T extends Writable> T fromByteArray(byte[] array, Class<T> clazz) { try { T o = (T) ReflectionUtils.newInstance(clazz, null); @@ -99,4 +106,143 @@ public class WritableUtils { String str = dataInput.readUTF(); return (str.equals(NULL)) ? null : str; } + + /** + * Read list. + * + * @param <T> the generic type + * @param dataInput the data input + * @param clazz the clazz + * @return the list + * @throws IOException Signals that an I/O exception has occurred. + */ + public static <T extends Writable> List<T> readList(DataInput dataInput, Class<T> clazz) throws IOException { + List<T> a = new ArrayList<T>(); + int count = dataInput.readInt(); + for (int i = 0; i < count; i++) { + T o = (T) ReflectionUtils.newInstance(clazz, null); + o.readFields(dataInput); + a.add(o); + } + return a; + } + + public static List<String> readStringList(DataInput dataInput) throws IOException { + List<String> a = new ArrayList<String>(); + int count = dataInput.readInt(); + for (int i = 0; i < count; i++) { + a.add(readBytesAsString(dataInput)); + } + return a; + } + + /** + * Write list. + * + * @param <T> the generic type + * @param dataOutput the data output + * @param list the list + * @throws IOException Signals that an I/O exception has occurred. + */ + public static <T extends Writable> void writeList(DataOutput dataOutput, List<T> list) throws IOException { + dataOutput.writeInt(list.size()); + for (T t : list) { + t.write(dataOutput); + } + } + + public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException { + dataOutput.writeInt(list.size()); + for (String str : list) { + writeStringAsBytes(dataOutput, str); + } + } + + /** + * Write map. + * + * @param <T> the generic type + * @param dataOutput the data output + * @param map the map + * @throws IOException Signals that an I/O exception has occurred. + */ + public static <T extends Writable> void writeMap(DataOutput dataOutput, Map<String, T> map) throws IOException { + dataOutput.writeInt(map.size()); + for (Entry<String, T> t : map.entrySet()) { + writeStringAsBytes(dataOutput, t.getKey()); + t.getValue().write(dataOutput); + } + } + + /** + * Write map with list. + * + * @param <T> the generic type + * @param dataOutput the data output + * @param map the map + * @throws IOException Signals that an I/O exception has occurred. + */ + public static <T extends Writable> void writeMapWithList(DataOutput dataOutput, Map<String, List<T>> map) + throws IOException { + dataOutput.writeInt(map.size()); + for (Entry<String, List<T>> t : map.entrySet()) { + writeStringAsBytes(dataOutput, t.getKey()); + writeList(dataOutput, t.getValue()); + } + } + + /** + * Read map. + * + * @param <T> the generic type + * @param dataInput the data input + * @param clazz the clazz + * @return the map + * @throws IOException Signals that an I/O exception has occurred. + */ + public static <T extends Writable> Map<String, T> readMap(DataInput dataInput, Class<T> clazz) throws IOException { + Map<String, T> map = new HashMap<String, T>(); + int count = dataInput.readInt(); + for (int i = 0; i < count; i++) { + String key = readBytesAsString(dataInput); + T value = (T) ReflectionUtils.newInstance(clazz, null); + value.readFields(dataInput); + map.put(key, value); + } + return map; + } + + /** + * Read map with list. + * + * @param <T> the generic type + * @param dataInput the data input + * @param clazz the clazz + * @return the map + * @throws IOException Signals that an I/O exception has occurred. + */ + public static <T extends Writable> Map<String, List<T>> readMapWithList(DataInput dataInput, Class<T> clazz) + throws IOException { + Map<String, List<T>> map = new HashMap<String, List<T>>(); + int count = dataInput.readInt(); + for (int i = 0; i < count; i++) { + String key = readBytesAsString(dataInput); + map.put(key, readList(dataInput, clazz)); + } + return map; + } + + public static void writeStringAsBytes(DataOutput dOut, String value) throws IOException { + byte[] data = value.getBytes(CodecFactory.UTF_8_ENCODING); + dOut.writeInt(data.length); + dOut.write(data); + } + + public static String readBytesAsString(DataInput dIn) throws IOException { + int length = dIn.readInt(); + byte[] data = new byte[length]; + dIn.readFully(data); + return new String(data, CodecFactory.UTF_8_ENCODING); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index ca49fa6..3ff7320 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1516,7 +1516,7 @@ <name>oozie.service.SchemaService.coord.schemas</name> <value> oozie-coordinator-0.1.xsd,oozie-coordinator-0.2.xsd,oozie-coordinator-0.3.xsd,oozie-coordinator-0.4.xsd, - oozie-sla-0.1.xsd,oozie-sla-0.2.xsd + oozie-coordinator-0.5.xsd,oozie-sla-0.1.xsd,oozie-sla-0.2.xsd </value> <description> List of schemas for coordinators (separated by commas). http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java index 1fe1b3a..c27a40a 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java @@ -19,7 +19,6 @@ package org.apache.oozie.command.coord; import java.io.IOException; -import java.io.Reader; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -34,6 +33,7 @@ import org.apache.oozie.client.CoordinatorJob.Execution; import org.apache.oozie.client.CoordinatorJob.Timeunit; import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.coord.input.dependency.CoordOldInputDependency; import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; @@ -553,13 +553,14 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase { Path appPath = new Path(getFsTestCaseDir(), "coord"); String inputDir = appPath.toString() + "/coord-input/2010/07/09/01/00"; String nonExistDir = inputDir.replaceFirst("localhost", "nonExist"); + CoordinatorActionBean actionBean = new CoordinatorActionBean(); try { - caicc.pathExists(nonExistDir, new XConfiguration(), getTestUser()); + new CoordOldInputDependency().pathExists(actionBean, nonExistDir, new XConfiguration(), getTestUser()); fail("Should throw exception due to non-existent NN path. Therefore fail"); } catch (IOException ioe) { - assertEquals(caicc.getCoordActionErrorCode(), "E0901"); - assertTrue(caicc.getCoordActionErrorMsg().contains("not in Oozie's whitelist")); + assertEquals(actionBean.getErrorCode(), "E0901"); + assertTrue(actionBean.getErrorMessage().contains("not in Oozie's whitelist")); } }
