Repository: oozie Updated Branches: refs/heads/master 34289c4c7 -> 27e4bf168
OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/27e4bf16 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/27e4bf16 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/27e4bf16 Branch: refs/heads/master Commit: 27e4bf1688a6a7750b9c8454de5021337696fd61 Parents: 34289c4 Author: Julia Kinga Marton <[email protected]> Authored: Thu Nov 22 11:31:12 2018 +0100 Committer: Julia Kinga Marton <[email protected]> Committed: Thu Nov 22 11:31:12 2018 +0100 ---------------------------------------------------------------------- .../java/org/apache/oozie/command/XCommand.java | 2 +- .../apache/oozie/coord/CoordELFunctions.java | 683 ++++++++++++------- .../oozie/command/coord/CoordELExtensions.java | 7 +- .../oozie/coord/TestOozieTimeUnitConverter.java | 76 +++ release-log.txt | 1 + 5 files changed, 503 insertions(+), 266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/main/java/org/apache/oozie/command/XCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java index a80444e..7b2dbd5 100644 --- a/core/src/main/java/org/apache/oozie/command/XCommand.java +++ b/core/src/main/java/org/apache/oozie/command/XCommand.java @@ -218,7 +218,7 @@ public abstract class XCommand<T> implements XCallable<T> { getLockTimeOut(), getName()); } else { - throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut()); + throw new CommandException(ErrorCode.E0606, getEntityKey(), getLockTimeOut()); } } else { http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java index 10f4f0d..c3fecd8 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java @@ -18,6 +18,8 @@ package org.apache.oozie.coord; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; @@ -28,6 +30,7 @@ import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; import org.apache.oozie.dependency.URIHandler; import org.apache.oozie.dependency.URIHandler.Context; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.Services; import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.util.DateUtils; @@ -37,18 +40,23 @@ import org.apache.oozie.util.XLog; import org.jdom.JDOMException; import java.net.URI; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; import java.util.TimeZone; +import java.util.concurrent.atomic.AtomicInteger; /** * This class implements the EL function related to coordinator */ public class CoordELFunctions { + private static final XLog LOG = XLog.getLog(CoordELFunctions.class); + final public static String DATASET = "oozie.coord.el.dataset.bean"; final public static String COORD_ACTION = "oozie.coord.el.app.bean"; final public static String CONFIGURATION = "oozie.coord.el.conf"; @@ -320,105 +328,8 @@ public class CoordELFunctions { return coord_futureRange_sync(n, n, instance); } - private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception { - final XLog LOG = XLog.getLog(CoordELFunctions.class); - final Thread currentThread = Thread.currentThread(); - ELEvaluator eval = ELEvaluator.getCurrent(); - String retVal = ""; - int datasetFrequency = (int) getDSFrequency();// in minutes - TimeUnit dsTimeUnit = getDSTimeUnit(); - int[] instCount = new int[1]; - Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); - StringBuilder resolvedInstances = new StringBuilder(); - StringBuilder resolvedURIPaths = new StringBuilder(); - if (nominalInstanceCal != null) { - Calendar initInstance = getInitialInstanceCal(); - nominalInstanceCal = (Calendar) initInstance.clone(); - nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); - - SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); - if (ds == null) { - throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); - } - String uriTemplate = ds.getUriTemplate(); - Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); - if (conf == null) { - throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); - } - int available = 0, checkedInstance = 0; - boolean resolved = false; - String user = ParamChecker - .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); - String doneFlag = ds.getDoneFlag(); - URIHandlerService uriService = Services.get().get(URIHandlerService.class); - URIHandler uriHandler = null; - Context uriContext = null; - try { - while (instance >= checkedInstance && !currentThread.isInterrupted()) { - ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); - String uriPath = uriEval.evaluate(uriTemplate, String.class); - if (uriHandler == null) { - URI uri = new URI(uriPath); - uriHandler = uriService.getURIHandler(uri); - uriContext = uriHandler.getContext(uri, conf, user, true); - } - String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); - if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { - if (available == endOffset) { - LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); - resolved = true; - resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); - resolvedURIPaths.append(uriPath); - retVal = resolvedInstances.toString(); - eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); - break; - } - else if (available >= startOffset) { - LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag); - resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( - INSTANCE_SEPARATOR); - resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); - - } - available++; - } - // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency); - nominalInstanceCal = (Calendar) initInstance.clone(); - instCount[0]++; - nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); - checkedInstance++; - // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); - } - if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { - eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); - } - - } - finally { - if (uriContext != null) { - uriContext.destroy(); - } - } - if (!resolved) { - // return unchanged future function with variable 'is_resolved' - // to 'false' - eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); - if (startOffset == endOffset) { - retVal = "${coord:future(" + startOffset + ", " + instance + ")}"; - } - else { - retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}"; - } - } - else { - eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); - } - } - else {// No feasible nominal time - eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); - retVal = ""; - } - return retVal; + private static String coord_futureRange_sync(final int startOffset, final int endOffset, final int instance) throws Exception { + return new FutureEvaluator(startOffset, endOffset, instance).evaluate(); } /** @@ -843,7 +754,7 @@ public class CoordELFunctions { } public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception { - int[] instanceCount = new int[1]; + final AtomicInteger instanceCount = new AtomicInteger(0); Calendar startInstanceCal = DateUtils.getCalendar(startInstance); Calendar currentInstance = getCurrentInstance(startInstanceCal.getTime(), instanceCount); // getCurrentInstance() returns null, which means startInstance is less @@ -859,7 +770,7 @@ public class CoordELFunctions { + DateUtils.formatDateOozieTZ(getInitialInstanceCal()) + " and start-instance is " + DateUtils.formatDateOozieTZ(startInstanceCal)); } - int[] nominalCount = new int[1]; + final AtomicInteger nominalCount = new AtomicInteger(0); if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) { throw new CommandException(ErrorCode.E1010, "initial-instance should be equal or earlier than the nominal time. initial-instance is " @@ -868,11 +779,11 @@ public class CoordELFunctions { // getCurrentInstance return offset relative to initial instance. // start instance offset - nominal offset = start offset relative to // nominal time-stamp. - int start = instanceCount[0] - nominalCount[0]; + int start = instanceCount.get() - nominalCount.get(); if (start > end) { throw new CommandException(ErrorCode.E1010, "start-instance should be equal or earlier than the end-instance. startInstance is " - + startInstance + " which is equivalent to current (" + instanceCount[0] + + startInstance + " which is equivalent to current (" + instanceCount.get() + ") but end is specified as current (" + end + ")"); } return ph2_coord_currentRange(start, end); @@ -1051,7 +962,7 @@ public class CoordELFunctions { final XLog LOG = XLog.getLog(CoordELFunctions.class); int datasetFrequency = getDSFrequency();// in minutes TimeUnit dsTimeUnit = getDSTimeUnit(); - int[] instCount = new int[1];// used as pass by ref + final AtomicInteger instCount = new AtomicInteger(0);// used as pass by ref Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount); if (nominalInstanceCal == null) { LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is" @@ -1062,7 +973,7 @@ public class CoordELFunctions { Calendar initInstance = getInitialInstanceCal(); // Add in the reverse order - newest instance first. nominalInstanceCal = (Calendar) initInstance.clone(); - nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency); + nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount.get() + start) * datasetFrequency); List<String> instances = new ArrayList<String>(); for (int i = start; i <= end; i++) { if (nominalInstanceCal.compareTo(initInstance) < 0) { @@ -1153,108 +1064,7 @@ public class CoordELFunctions { } private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception { - final XLog LOG = XLog.getLog(CoordELFunctions.class); - final Thread currentThread = Thread.currentThread(); - ELEvaluator eval = ELEvaluator.getCurrent(); - String retVal = ""; - int datasetFrequency = (int) getDSFrequency();// in minutes - TimeUnit dsTimeUnit = getDSTimeUnit(); - int[] instCount = new int[1]; - boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false); - Calendar nominalInstanceCal; - if (useCurrentTime) { - nominalInstanceCal = getCurrentInstance(new Date(), instCount); - } - else { - nominalInstanceCal = getCurrentInstance(getActualTime(), instCount); - } - StringBuilder resolvedInstances = new StringBuilder(); - StringBuilder resolvedURIPaths = new StringBuilder(); - if (nominalInstanceCal != null) { - Calendar initInstance = getInitialInstanceCal(); - SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); - if (ds == null) { - throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); - } - String uriTemplate = ds.getUriTemplate(); - Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); - if (conf == null) { - throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); - } - int available = 0; - boolean resolved = false; - String user = ParamChecker - .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); - String doneFlag = ds.getDoneFlag(); - URIHandlerService uriService = Services.get().get(URIHandlerService.class); - URIHandler uriHandler = null; - Context uriContext = null; - try { - while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) { - ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal); - String uriPath = uriEval.evaluate(uriTemplate, String.class); - if (uriHandler == null) { - URI uri = new URI(uriPath); - uriHandler = uriService.getURIHandler(uri); - uriContext = uriHandler.getContext(uri, conf, user, true); - } - String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); - if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) { - XLog.getLog(CoordELFunctions.class) - .debug("Found latest(" + available + "): " + uriWithDoneFlag); - if (available == startOffset) { - LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); - resolved = true; - resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)); - resolvedURIPaths.append(uriPath); - retVal = resolvedInstances.toString(); - eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); - - break; - } - else if (available <= endOffset) { - LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag); - resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append( - INSTANCE_SEPARATOR); - resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); - } - - available--; - } - // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency); - nominalInstanceCal = (Calendar) initInstance.clone(); - instCount[0]--; - nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency); - // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag()); - } - if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { - eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); - } - } - finally { - if (uriContext != null) { - uriContext.destroy(); - } - } - if (!resolved) { - // return unchanged latest function with variable 'is_resolved' - // to 'false' - eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); - if (startOffset == endOffset) { - retVal = "${coord:latest(" + startOffset + ")}"; - } - else { - retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}"; - } - } - else { - eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); - } - } - else {// No feasible nominal time - eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); - } - return retVal; + return new LatestEvaluator(startOffset, endOffset).evaluate(); } /** @@ -1406,7 +1216,7 @@ public class CoordELFunctions { * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of * the dataset. */ - public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) { + public static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount) { ELEvaluator eval = ELEvaluator.getCurrent(); return getCurrentInstance(effectiveTime, instanceCount, eval); } @@ -1417,7 +1227,7 @@ public class CoordELFunctions { * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of * the dataset. */ - private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) { + private static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount, ELEvaluator eval) { Date datasetInitialInstance = getInitialInstance(eval); TimeUnit dsTimeUnit = getDSTimeUnit(eval); TimeZone dsTZ = getDatasetTZ(eval); @@ -1429,91 +1239,52 @@ public class CoordELFunctions { Calendar calEffectiveTime = new GregorianCalendar(dsTZ); calEffectiveTime.setTime(effectiveTime); if (instanceCount == null) { // caller doesn't care about this value - instanceCount = new int[1]; + instanceCount = new AtomicInteger(0); } - instanceCount[0] = 0; + instanceCount.set(0); if (current.compareTo(calEffectiveTime) > 0) { return null; } switch(dsTimeUnit) { case MINUTE: - instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC); + instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC)); break; case HOUR: - instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC); + instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC)); break; case DAY: case END_OF_DAY: - instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC); + instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC)); break; case WEEK: case END_OF_WEEK: - instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC); + instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC)); break; case MONTH: case END_OF_MONTH: int diffYear = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR); - instanceCount[0] = diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH); + instanceCount.set(diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH)); break; case YEAR: - instanceCount[0] = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR); + instanceCount.set(calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR)); break; default: throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit); } - if (instanceCount[0] > 2) { - instanceCount[0] = (instanceCount[0] / dsFreq); - current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); + if (instanceCount.get() > 2) { + instanceCount.set(instanceCount.get() / dsFreq); + current.add(dsTimeUnit.getCalendarUnit(), instanceCount.get() * dsFreq); } else { - instanceCount[0] = 0; + instanceCount.set(0); } while (!current.getTime().after(effectiveTime)) { current.add(dsTimeUnit.getCalendarUnit(), dsFreq); - instanceCount[0]++; + instanceCount.incrementAndGet(); } current.add(dsTimeUnit.getCalendarUnit(), -dsFreq); - instanceCount[0]--; - return current; - } - - /** - * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time) - * - * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of - * the dataset. - */ - private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) { - Date datasetInitialInstance = getInitialInstance(eval); - TimeUnit dsTimeUnit = getDSTimeUnit(eval); - TimeZone dsTZ = getDatasetTZ(eval); - int dsFreq = getDSFrequency(eval); - // Convert Date to Calendar for corresponding TZ - Calendar current = Calendar.getInstance(); - current.setTime(datasetInitialInstance); - current.setTimeZone(dsTZ); - - Calendar calEffectiveTime = Calendar.getInstance(); - calEffectiveTime.setTime(effectiveTime); - calEffectiveTime.setTimeZone(dsTZ); - if (instanceCount == null) { // caller doesn't care about this value - instanceCount = new int[1]; - } - instanceCount[0] = 0; - if (current.compareTo(calEffectiveTime) > 0) { - return null; - } - Calendar origCurrent = (Calendar) current.clone(); - while (current.compareTo(calEffectiveTime) <= 0) { - current = (Calendar) origCurrent.clone(); - instanceCount[0]++; - current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); - } - instanceCount[0]--; - - current = (Calendar) origCurrent.clone(); - current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq); + instanceCount.decrementAndGet(); return current; } @@ -1706,4 +1477,392 @@ public class CoordELFunctions { cal.add(timeUnit.getCalendarUnit(), n); return cal; } + + /** + * Evaluating {@code coord:future()} and {@code coord:futureRange()} data input dependencies. + */ + static final class FutureEvaluator extends RangeEvaluator { + private final int instance; + private int checkedInstance = 0; + + FutureEvaluator(final int startOffset, final int endOffset, final int instance) { + super("future", startOffset, endOffset); + this.instance = instance; + } + + @Override + protected Calendar getNominalInstance() { + return getCurrentInstance(getActionCreationtime(), instCount); + } + + @Override + protected void reset() { + super.reset(); + checkedInstance = 0; + } + + @Override + protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) { + return instance >= checkedInstance; + } + + @Override + protected boolean isFirst() { + return available == endOffset; + } + + @Override + protected boolean isInBetween() { + return available >= startOffset; + } + + @Override + protected void stepAvailable() { + available++; + } + + @Override + protected void stepInstanceCount() { + instCount.incrementAndGet(); + checkedInstance++; + } + + @Override + protected String from() { + return String.format("%s, %s", startOffset, instance); + } + + @Override + protected String fromTo() { + return String.format("%s, %s, %s", startOffset, endOffset, instance); + } + } + + /** + * Evaluating {@code coord:latest()} and {@code coord:latestRange()} data input dependencies. + */ + static final class LatestEvaluator extends RangeEvaluator { + + LatestEvaluator(final int startOffset, final int endOffset) { + super("latest", startOffset, endOffset); + } + + @Override + protected Calendar getNominalInstance() { + final boolean useCurrentTime = ConfigurationService.getBoolean(LATEST_EL_USE_CURRENT_TIME, false); + if (useCurrentTime) { + return getCurrentInstance(new Date(), instCount); + } + else { + return getCurrentInstance(getActualTime(), instCount); + } + } + + @Override + protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) { + return nominalInstance.compareTo(initInstance) >= 0; + } + + @Override + protected boolean isFirst() { + return available == startOffset; + } + + @Override + protected boolean isInBetween() { + return available <= endOffset; + } + + @Override + protected void stepAvailable() { + available--; + } + + @Override + protected void stepInstanceCount() { + instCount.decrementAndGet(); + } + + @Override + protected String from() { + return String.format("%s", startOffset); + } + + @Override + protected String fromTo() { + return String.format("%s, %s", startOffset, endOffset); + } + } + + private static abstract class RangeEvaluator { + /** + * What is the use case: evaluating {@code future} or {@code latest} dataset occurrences. + */ + private final String type; + + /** + * + */ + final int startOffset; + final int endOffset; + + final AtomicInteger instCount = new AtomicInteger(0); + int available = 0; + + RangeEvaluator(final String type, final int startOffset, final int endOffset) { + this.type = type; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + /** + * Evaluate the input data dependency's EL function based on HDFS checks for URI existence, or leave it unevaluated. + * <p> + * Based on following: + * <ul> + * <li>{@code startOffset}</li> + * <li>{@code endOffset}</li> + * <li>internal state like {@code instCount} and {@code available}</li> + * </ul> + * @return timestamp of the {@code future} / {@code latest} HDFS URI based on parameters, and HDFS URI presence, or the + * unmodified input EL expression, if not present. + * @throws Exception in case of wrong arguments, or HDFS access errors + */ + String evaluate() throws Exception { + final Thread currentThread = Thread.currentThread(); + final ELEvaluator eval = ELEvaluator.getCurrent(); + String retVal = ""; + final int datasetFrequency = getDSFrequency();// in minutes + final TimeUnit dsTimeUnit = getDSTimeUnit(); + instCount.set(0); + + Calendar nominalInstance = getNominalInstance(); + + final StringBuilder resolvedInstances = new StringBuilder(); + final StringBuilder resolvedURIPaths = new StringBuilder(); + if (nominalInstance != null) { + final Calendar initInstance = getInitialInstanceCal(); + SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET); + if (ds == null) { + throw new RuntimeException("Associated Dataset should be defined with key " + DATASET); + } + final String uriTemplate = ds.getUriTemplate(); + final Configuration conf = (Configuration) eval.getVariable(CONFIGURATION); + if (conf == null) { + throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION); + } + + reset(); + + boolean resolved = false; + final String user = ParamChecker + .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME); + final String doneFlag = ds.getDoneFlag(); + final URIHandlerService uriService = Services.get().get(URIHandlerService.class); + URIHandler uriHandler = null; + Context uriContext = null; + try { + int retries = 0; + final DateFormat dMYHMS = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss"); + final long maxRetries = new OozieTimeUnitConverter().convertMillis( + (nominalInstance.getTime().getTime() - initInstance.getTime().getTime()) / ds.getFrequency(), + dsTimeUnit); + if (maxRetries > 0) { + LOG.debug("Approximately [{0}] maximal retries going back till [{1}] for checking latest " + + "dataset existence. [name={2}]", + maxRetries, + dMYHMS.format(initInstance.getTime()), + ds.getName()); + } + while (isAvailable(nominalInstance, initInstance) && !currentThread.isInterrupted()) { + final ELEvaluator uriEval = getUriEvaluator(nominalInstance); + final String uriPath = uriEval.evaluate(uriTemplate, String.class); + if (uriHandler == null) { + URI uri = new URI(uriPath); + uriHandler = uriService.getURIHandler(uri); + uriContext = uriHandler.getContext(uri, conf, user, true); + } + final String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag); + LOG.trace("Checking dataset existence. [name={0};uriWithDoneFlag={1};retries={2};nominalTime={3}]", + ds.getName(), + uriWithDoneFlag, + retries, + dMYHMS.format(nominalInstance.getTime())); + final Date now = new Date(); + final boolean uriWithDoneFlagExists = uriHandler.exists(new URI(uriWithDoneFlag), uriContext); + final Date later = new Date(); + LOG.trace("[{0}] ms elapsed while checking for dataset existence. [name={1};uriWithDoneFlag={2}]", + later.getTime() - now.getTime(), + ds.getName(), + uriWithDoneFlag); + if (uriWithDoneFlagExists) { + LOG.debug("Found current dataset for {0}({1}). [name={2};uriWithDoneFlag={3}", + type, + available, + ds.getName(), + uriWithDoneFlag); + if (isFirst()) { + LOG.debug("Matched first dataset for {0}({1}), resolving. [name={2};uriWithDoneFlag={3}]", + type, + available, + ds.getName(), + uriWithDoneFlag); + resolved = true; + resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance)); + resolvedURIPaths.append(uriPath); + retVal = resolvedInstances.toString(); + eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); + + break; + } + else if (isInBetween()) { + LOG.debug("Matched dataset in between for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]", + type, + available, + ds.getName(), + uriWithDoneFlag); + resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance)).append( + INSTANCE_SEPARATOR); + resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR); + } + else { + LOG.debug("Not matching dataset for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]", + type, + available, + ds.getName(), + uriWithDoneFlag); + } + + stepAvailable(); + } + else { + LOG.trace("Could not find dataset. [name={0};uriWithDoneFlag={1}]", + ds.getName(), + uriWithDoneFlag); + } + + stepInstanceCount(); + + nominalInstance = (Calendar) initInstance.clone(); + nominalInstance.add(dsTimeUnit.getCalendarUnit(), instCount.get() * datasetFrequency); + + retries++; + } + if (!StringUtils.isEmpty(resolvedURIPaths.toString()) + && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) { + eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString()); + } + } + finally { + if (uriContext != null) { + uriContext.destroy(); + } + } + if (!resolved) { + // return unchanged function with variable 'is_resolved' + // to 'false' + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); + if (startOffset == endOffset) { + retVal = String.format("${coord:%s(%s)}", type, from()); + } + else { + retVal = String.format("${coord:%sRange(%s)}", type, fromTo()); + } + } + else { + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE); + } + } + else {// No feasible nominal time + eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE); + } + + return retVal; + + } + + /** + * Get the nominal {@link Calendar} instance, that is, as base of the next evaluated occurrence. + * @return a {@link Calendar} instance based on {@link #type} + */ + protected abstract Calendar getNominalInstance(); + + /** + * Reset the evaluator's internal state between two {@link #evaluate()} calls. + */ + protected void reset() { + available = 0; + } + + /** + * Checks whether a dataset is available. + * @param nominalInstance the nominal instance + * @param initInstance + * @return + */ + protected abstract boolean isAvailable(Calendar nominalInstance, Calendar initInstance); + + /** + * Checks whether it's the first matching for the given evaluation. + * @return + */ + protected abstract boolean isFirst(); + + /** + * Checks whether it's not the first but a valid matching for the given evaluation. + * @return + */ + protected abstract boolean isInBetween(); + + /** + * Modify the internal state in case of a match. + */ + protected abstract void stepAvailable(); + + /** + * Modify the internal state in case an instance was checked. + */ + protected abstract void stepInstanceCount(); + + /** + * Substitution of the range parameters for an open range with only a starting point. + * @return + */ + protected abstract String from(); + + + /** + * Substitution of the range parameters for a closed range with a starting and an ending point. + * @return + */ + protected abstract String fromTo(); + } + + @VisibleForTesting + static class OozieTimeUnitConverter { + /** + * Convert {@code millis} given {@code source} to {@link java.util.concurrent.TimeUnit}. + * @param millis + * @param source + * @return -1 if no correct {@code source} was given, else the estimated occurrence count of a dataset + */ + long convertMillis(final long millis, final TimeUnit source) { + Preconditions.checkNotNull(source, "source has to be filled"); + + switch (source) { + case YEAR: + return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 365; + case MONTH: + return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 31; + case DAY: + return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS); + case HOUR: + return java.util.concurrent.TimeUnit.HOURS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS); + case MINUTE: + return java.util.concurrent.TimeUnit.MINUTES.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS); + default: + return -1; + } + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java index 796d19c..2880641 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java +++ b/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java @@ -19,6 +19,7 @@ package org.apache.oozie.command.coord; import java.util.Calendar; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.oozie.coord.CoordELFunctions; import org.apache.oozie.util.ELEvaluator; @@ -45,19 +46,19 @@ public class CoordELExtensions { dsInstanceCal.set(Calendar.SECOND, 0); dsInstanceCal.set(Calendar.MILLISECOND, 0); - int[] instCnt = new int[1]; + final AtomicInteger instCnt = new AtomicInteger(0); Calendar compInstCal = CoordELFunctions .getCurrentInstance(dsInstanceCal.getTime(), instCnt); if (compInstCal == null) { return ""; } - int dsInstanceCnt = instCnt[0]; + int dsInstanceCnt = instCnt.get(); compInstCal = CoordELFunctions.getCurrentInstance(nominalInstanceCal.getTime(), instCnt); if (compInstCal == null) { return ""; } - int nominalInstanceCnt = instCnt[0]; + int nominalInstanceCnt = instCnt.get(); return "coord:current(" + (dsInstanceCnt - nominalInstanceCnt) + ")"; } http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java b/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java new file mode 100644 index 0000000..c31c5cc --- /dev/null +++ b/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.coord; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + +public class TestOozieTimeUnitConverter { + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test + public void whenSourceIsNullNPEIsThrown() { + expectedException.expect(NullPointerException.class); + + new CoordELFunctions.OozieTimeUnitConverter().convertMillis(0, null); + } + + @Test + public void whenSourceIsNotRecognized() { + assertMillisConverted(-1, 0, TimeUnit.END_OF_DAY); + assertMillisConverted(-1, 0, TimeUnit.END_OF_WEEK); + assertMillisConverted(-1, 0, TimeUnit.END_OF_MONTH); + assertMillisConverted(-1, 0, TimeUnit.CRON); + assertMillisConverted(-1, 0, TimeUnit.NONE); + } + + @Test + public void whenSourceMillisAreConvertedToMinutesCorrectly() { + assertMillisConverted(0, 1, TimeUnit.MINUTE); + assertMillisConverted(0, -1, TimeUnit.MINUTE); + assertMillisConverted(1, 60_000, TimeUnit.MINUTE); + assertMillisConverted(-1, -60_000, TimeUnit.MINUTE); + } + + @Test + public void whenSourceMillisAreConvertedToHoursCorrectly() { + assertMillisConverted(0, 1, TimeUnit.HOUR); + assertMillisConverted(0, -1, TimeUnit.HOUR); + assertMillisConverted(1, 3_600_000, TimeUnit.HOUR); + assertMillisConverted(-1, -3_600_000, TimeUnit.HOUR); + } + + @Test + public void whenSourceMillisAreConvertedToDaysCorrectly() { + assertMillisConverted(0, 1, TimeUnit.DAY); + assertMillisConverted(0, -1, TimeUnit.DAY); + assertMillisConverted(1, 86_400_000, TimeUnit.DAY); + assertMillisConverted(-1, -86_400_000, TimeUnit.DAY); + } + + private void assertMillisConverted(final long expectedTUCount, final long millis, final TimeUnit oozieTU) { + assertEquals(String.format("%d millis are converted to %s correctly", millis, oozieTU.name()), + expectedTUCount, + new CoordELFunctions.OozieTimeUnitConverter().convertMillis(millis, oozieTU)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ae54814..1ebbdd4 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.2.0 release (trunk - unreleased) +OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton) OOZIE-3380 TestCoordMaterializeTransitionXCommand failure after DST change date (asalamon74 via kmarton) OOZIE-3338 [build] Remove SVN references (asalamon74 via andras.piros) OOZIE-3378 [core] Coordinator action's status is SUBMITTED after E1003 error (asalamon74 via andras.piros)
