FALCON-1802 EL Expressions support in Native Scheduler Author: pavankumar526 <[email protected]>
Reviewers: "sandeepSamudrala <[email protected]>, Ajay Yadava <[email protected]>" Closes #57 from pavankumar526/FALCON-1802 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7e4dc0d9 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7e4dc0d9 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7e4dc0d9 Branch: refs/heads/master Commit: 7e4dc0d923b5ef3d44f7fa5c85681d6fa89e0caf Parents: 62393fe Author: pavankumar526 <[email protected]> Authored: Tue Apr 19 12:03:57 2016 +0530 Committer: pavankumar526 <[email protected]> Committed: Tue Apr 19 12:03:57 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/entity/EntityUtil.java | 27 +++ .../falcon/expression/ExpressionHelper.java | 44 ++++ .../apache/falcon/persistence/EntityBean.java | 15 +- .../org/apache/falcon/util/CalendarUnit.java | 39 ++++ .../java/org/apache/falcon/util/DateUtil.java | 175 +++++++++++++++ .../falcon/expression/ExpressionHelperTest.java | 24 ++ oozie/pom.xml | 6 + .../apache/falcon/oozie/OozieEntityBuilder.java | 9 + .../OozieOrchestrationWorkflowBuilder.java | 29 ++- .../NativeOozieProcessWorkflowBuilder.java | 222 +++++++++++++++++++ .../execution/FalconExecutionService.java | 16 +- .../execution/ProcessExecutionInstance.java | 109 ++++++--- .../falcon/execution/ProcessExecutor.java | 37 +++- .../apache/falcon/execution/SchedulerUtil.java | 34 +++ .../service/impl/DataAvailabilityService.java | 9 +- .../service/impl/SchedulerService.java | 6 +- .../request/DataNotificationRequest.java | 17 ++ .../org/apache/falcon/predicate/Predicate.java | 1 + .../org/apache/falcon/state/EntityState.java | 11 + .../org/apache/falcon/state/InstanceState.java | 6 +- .../org/apache/falcon/state/StateService.java | 19 +- .../falcon/state/store/EntityStateStore.java | 2 +- .../falcon/state/store/jdbc/BeanMapperUtil.java | 37 +++- .../falcon/state/store/jdbc/JDBCStateStore.java | 27 ++- .../falcon/workflow/engine/DAGEngine.java | 6 +- .../workflow/engine/FalconWorkflowEngine.java | 18 +- .../falcon/workflow/engine/OozieDAGEngine.java | 75 +++---- .../execution/FalconExecutionServiceTest.java | 7 +- .../apache/falcon/execution/MockDAGEngine.java | 4 +- scheduler/src/test/resources/runtime.properties | 25 +++ src/conf/runtime.properties | 11 + .../apache/falcon/unit/FalconUnitTestBase.java | 2 +- .../AbstractSchedulerManagerJerseyIT.java | 7 +- .../InstanceSchedulerManagerJerseyIT.java | 35 ++- .../resources/process-nolatedata-template.xml | 50 +++++ 35 files changed, 1040 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 96befa1..8825a65 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -94,6 +94,15 @@ public final class EntityUtil { public static final String WF_LIB_SEPARATOR = ","; private static final String STAGING_DIR_NAME_SEPARATOR = "_"; + public static final ThreadLocal<SimpleDateFormat> PATH_FORMAT = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmm"); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + return format; + } + }; + /** Priority with which the DAG will be scheduled. * Matches the five priorities of Hadoop jobs. */ @@ -1082,4 +1091,22 @@ public final class EntityUtil { } return JOBPRIORITY.NORMAL; } + + + /** + * Evaluates feedpath based on instance time. + * @param feedPath + * @param instanceTime + * @return + */ + public static String evaluateDependentPath(String feedPath, Date instanceTime) { + String timestamp = PATH_FORMAT.get().format(instanceTime); + String instancePath = feedPath.replaceAll("\\$\\{YEAR\\}", timestamp.substring(0, 4)); + instancePath = instancePath.replaceAll("\\$\\{MONTH\\}", timestamp.substring(4, 6)); + instancePath = instancePath.replaceAll("\\$\\{DAY\\}", timestamp.substring(6, 8)); + instancePath = instancePath.replaceAll("\\$\\{HOUR\\}", timestamp.substring(8, 10)); + instancePath = instancePath.replaceAll("\\$\\{MINUTE\\}", timestamp.substring(10, 12)); + return instancePath; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java index 65aaeba..451cbba 100644 --- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java +++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java @@ -21,6 +21,8 @@ package org.apache.falcon.expression; import org.apache.commons.el.ExpressionEvaluatorImpl; import org.apache.falcon.FalconException; import org.apache.falcon.entity.common.FeedDataPath; +import org.apache.falcon.util.CalendarUnit; +import org.apache.falcon.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +31,7 @@ import javax.servlet.jsp.el.ExpressionEvaluator; import javax.servlet.jsp.el.FunctionMapper; import javax.servlet.jsp.el.VariableResolver; import java.lang.reflect.Method; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; @@ -53,6 +56,7 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); private static final ExpressionHelper RESOLVER = ExpressionHelper.get(); + public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { @@ -257,4 +261,44 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver return originalValue; } + /** + * Converts date string to required format. + * @param dateTimeStr + * @param format + * @return + * @throws ParseException + */ + public static String formatTime(String dateTimeStr, String format) throws ParseException { + Date dateTime = DateUtil.parseDateFalconTZ(dateTimeStr); + return DateUtil.formatDateCustom(dateTime, format); + } + + /** + * Formats the instance and return. + * @return + */ + public static String instanceTime() { + return DateUtil.formatDateFalconTZ(referenceDate.get()); + } + + /** + * EL function calculates date based on the following equation : newDate = baseDate + instance, * timeUnit. + * @param strBaseDate + * @param offset + * @param unit + * @return + * @throws Exception + */ + public static String dateOffset(String strBaseDate, int offset, String unit) throws Exception { + Calendar baseCalDate = DateUtil.getCalendar(strBaseDate); + StringBuilder buffer = new StringBuilder(); + baseCalDate.add(CalendarUnit.valueOf(unit).getCalendarUnit(), offset); + buffer.append(DateUtil.formatDateFalconTZ(baseCalDate)); + return buffer.toString(); + } + + public static String user() { + return "${user.name}"; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/persistence/EntityBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java index 274305c..f1c9cf3 100644 --- a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java @@ -24,6 +24,7 @@ import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; +import javax.persistence.Lob; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.OneToMany; @@ -38,7 +39,7 @@ import java.util.List; @NamedQueries({ @NamedQuery(name = PersistenceConstants.GET_ENTITY, query = "select OBJECT(a) from EntityBean a where a.id = :id"), @NamedQuery(name = PersistenceConstants.GET_ENTITY_FOR_STATE, query = "select OBJECT(a) from EntityBean a where a.state = :state"), - @NamedQuery(name = PersistenceConstants.UPDATE_ENTITY, query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"), + @NamedQuery(name = PersistenceConstants.UPDATE_ENTITY, query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type, a.properties = :properties where a.id = :id"), @NamedQuery(name = PersistenceConstants.GET_ENTITIES_FOR_TYPE, query = "select OBJECT(a) from EntityBean a where a.type = :type"), @NamedQuery(name = PersistenceConstants.GET_ENTITIES, query = "select OBJECT(a) from EntityBean a"), @NamedQuery(name = PersistenceConstants.DELETE_ENTITY, query = "delete from EntityBean a where a.id = :id"), @@ -68,6 +69,10 @@ public class EntityBean { @Column(name = "current_state") private String state; + @Column(name = "properties") + @Lob + private byte[] properties; + @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean") private List<InstanceBean> instanceBeans; @@ -113,5 +118,13 @@ public class EntityBean { public void setInstanceBeans(List<InstanceBean> instanceBeans) { this.instanceBeans = instanceBeans; } + + public byte[] getProperties() { + return properties; + } + + public void setProperties(byte[] properties) { + this.properties = properties; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/util/CalendarUnit.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/CalendarUnit.java b/common/src/main/java/org/apache/falcon/util/CalendarUnit.java new file mode 100644 index 0000000..f2a40b9 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/CalendarUnit.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import java.util.Calendar; + +/** + * TimeUnit used for Date operations. + */ +public enum CalendarUnit { + MINUTE(Calendar.MINUTE), HOUR(Calendar.HOUR), DAY(Calendar.DATE), MONTH(Calendar.MONTH), + YEAR(Calendar.YEAR), END_OF_DAY(Calendar.DATE), END_OF_MONTH(Calendar.MONTH), CRON(0), NONE(-1); + + private int calendarUnit; + + private CalendarUnit(int calendarUnit) { + this.calendarUnit = calendarUnit; + } + + public int getCalendarUnit() { + return calendarUnit; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java index baf5b13..9e9b8e8 100644 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -17,12 +17,20 @@ */ package org.apache.falcon.util; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.Frequency; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.ParsePosition; +import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Helper to get date operations. @@ -39,8 +47,37 @@ public final class DateUtil { public static final long HOUR_IN_MILLIS = 60 * 60 * 1000; + private static final Pattern GMT_OFFSET_COLON_PATTERN = Pattern.compile("^GMT(\\-|\\+)(\\d{2})(\\d{2})$"); + + public static final TimeZone UTC = getTimeZone("UTC"); + + public static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'"; + private static String activeTimeMask = ISO8601_UTC_MASK; + private static TimeZone activeTimeZone = UTC; + + private static final Pattern VALID_TIMEZONE_PATTERN = Pattern.compile("^UTC$|^GMT(\\+|\\-)\\d{4}$"); + + private static final String ISO8601_TZ_MASK_WITHOUT_OFFSET = "yyyy-MM-dd'T'HH:mm"; + private static boolean entityInUTC = true; + private DateUtil() {} + /** + * Configures the Datetime parsing with process timezone. + * + */ + public static void setTimeZone(String tz) throws FalconException { + if (StringUtils.isBlank(tz)) { + tz = "UTC"; + } + if (!VALID_TIMEZONE_PATTERN.matcher(tz).matches()) { + throw new FalconException("Invalid entity timezone, it must be 'UTC' or 'GMT(+/-)####"); + } + activeTimeZone = TimeZone.getTimeZone(tz); + entityInUTC = activeTimeZone.equals(UTC); + activeTimeMask = (entityInUTC) ? ISO8601_UTC_MASK : ISO8601_TZ_MASK_WITHOUT_OFFSET + tz.substring(3); + } + public static Date getNextMinute(Date time) throws Exception { Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); insCal.setTime(time); @@ -99,4 +136,142 @@ public final class DateUtil { public static Date offsetTime(Date date, int seconds) { return new Date(1000L * seconds + date.getTime()); } + + /** + * Parses a datetime in ISO8601 format in the process timezone. + * + * @param s string with the datetime to parse. + * @return the corresponding {@link java.util.Date} instance for the parsed date. + * @throws java.text.ParseException thrown if the given string was + * not an ISO8601 value for the process timezone. + */ + public static Date parseDateFalconTZ(String s) throws ParseException { + s = s.trim(); + ParsePosition pos = new ParsePosition(0); + Date d = getISO8601DateFormat(activeTimeZone, activeTimeMask).parse(s, pos); + if (d == null) { + throw new ParseException("Could not parse [" + s + "] using [" + activeTimeMask + "] mask", + pos.getErrorIndex()); + } + if (s.length() > pos.getIndex()) { + throw new ParseException("Correct datetime string is followed by invalid characters: " + s, pos.getIndex()); + } + return d; + } + + private static DateFormat getISO8601DateFormat(TimeZone tz, String mask) { + DateFormat dateFormat = new SimpleDateFormat(mask); + // Stricter parsing to prevent dates such as 2011-12-50T01:00Z (December 50th) from matching + dateFormat.setLenient(false); + dateFormat.setTimeZone(tz); + return dateFormat; + } + + private static DateFormat getSpecificDateFormat(String format) { + DateFormat dateFormat = new SimpleDateFormat(format); + dateFormat.setTimeZone(activeTimeZone); + return dateFormat; + } + + /** + * Formats a {@link java.util.Date} as a string using the specified format mask. + * <p/> + * The format mask must be a {@link java.text.SimpleDateFormat} valid format mask. + * + * @param d {@link java.util.Date} to format. + * @return the string for the given date using the specified format mask, + * <code>NULL</code> if the {@link java.util.Date} instance was <code>NULL</code> + */ + public static String formatDateCustom(Date d, String format) { + return (d != null) ? getSpecificDateFormat(format).format(d) : "NULL"; + } + + /** + * Formats a {@link java.util.Date} as a string in ISO8601 format using process timezone. + * + * @param d {@link java.util.Date} to format. + * @return the ISO8601 string for the given date, <code>NULL</code> if the {@link java.util.Date} instance was + * <code>NULL</code> + */ + public static String formatDateFalconTZ(Date d) { + return (d != null) ? getISO8601DateFormat(activeTimeZone, activeTimeMask).format(d) : "NULL"; + } + + /** + * Returns the {@link java.util.TimeZone} for the given timezone ID. + * + * @param tzId timezone ID. + * @return the {@link java.util.TimeZone} for the given timezone ID. + */ + public static TimeZone getTimeZone(String tzId) { + if (tzId == null) { + throw new IllegalArgumentException("Timezone cannot be null"); + } + tzId = handleGMTOffsetTZNames(tzId); // account for GMT-#### + TimeZone tz = TimeZone.getTimeZone(tzId); + // If these are not equal, it means that the tzId is not valid (invalid tzId's return GMT) + if (!tz.getID().equals(tzId)) { + throw new IllegalArgumentException("Invalid TimeZone: " + tzId); + } + return tz; + } + + /** + * {@link java.util.TimeZone#getTimeZone(String)} takes the timezone ID as an argument; for invalid IDs + * it returns the <code>GMT</code> TimeZone. A timezone ID formatted like <code>GMT-####</code> is not a valid ID, + * however, it will actually map this to the <code>GMT-##:##</code> TimeZone, instead of returning the + * <code>GMT</code> TimeZone. We check (later) check that a timezone ID is valid by calling + * {@link java.util.TimeZone#getTimeZone(String)} and seeing if the returned + * TimeZone ID is equal to the original; because we want to allow <code>GMT-####</code>, while still + * disallowing actual invalid IDs, we have to manually replace <code>GMT-####</code> + * with <code>GMT-##:##</code> first. + * + * @param tzId The timezone ID + * @return If tzId matches <code>GMT-####</code>, then we return <code>GMT-##:##</code>; otherwise, + * we return tzId unaltered + */ + private static String handleGMTOffsetTZNames(String tzId) { + Matcher m = GMT_OFFSET_COLON_PATTERN.matcher(tzId); + if (m.matches() && m.groupCount() == 3) { + tzId = "GMT" + m.group(1) + m.group(2) + ":" + m.group(3); + } + return tzId; + } + + /** + * Create a Calendar instance for UTC time zone using the specified date. + * @param dateString + * @return appropriate Calendar object + * @throws Exception + */ + public static Calendar getCalendar(String dateString) throws Exception { + return getCalendar(dateString, activeTimeZone); + } + + /** + * Create a Calendar instance using the specified date and Time zone. + * @param dateString + * @param tz : TimeZone + * @return appropriate Calendar object + * @throws Exception + */ + public static Calendar getCalendar(String dateString, TimeZone tz) throws Exception { + Date date = DateUtil.parseDateFalconTZ(dateString); + Calendar calDate = Calendar.getInstance(); + calDate.setTime(date); + calDate.setTimeZone(tz); + return calDate; + } + + /** + * Formats a {@link java.util.Calendar} as a string in ISO8601 format process timezone. + * + * @param c {@link java.util.Calendar} to format. + * @return the ISO8601 string for the given date, <code>NULL</code> if the {@link java.util.Calendar} instance was + * <code>NULL</code> + */ + public static String formatDateFalconTZ(Calendar c) { + return (c != null) ? formatDateFalconTZ(c.getTime()) : "NULL"; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java index da5dbca..b3895c3 100644 --- a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java +++ b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java @@ -81,4 +81,28 @@ public class ExpressionHelperTest { {"future(1,0)", "2015-02-01T00:00Z"}, }; } + + @Test + public void testFormatTime() throws FalconException { + String output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy\")", + String.class); + Assert.assertEquals(output, "2016"); + output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM\")", + String.class); + Assert.assertEquals(output, "2016-02"); + output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM-dd\")", + String.class); + Assert.assertEquals(output, "2016-02-01"); + } + + + @Test + public void testOffsetAndInstanceTime() throws FalconException { + String date = expressionHelper.evaluate("dateOffset(instanceTime(), 1, 'DAY')", String.class); + Assert.assertEquals(date, "2015-02-02T00:00Z"); + date = expressionHelper.evaluate("dateOffset(instanceTime(), 3, 'HOUR')", String.class); + Assert.assertEquals(date, "2015-02-01T03:00Z"); + date = expressionHelper.evaluate("dateOffset(instanceTime(), -25, 'MINUTE')", String.class); + Assert.assertEquals(date, "2015-01-31T23:35Z"); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index 04b3df6..c14c625 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -103,6 +103,12 @@ <scope>compile</scope> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>${joda.version}</version> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java index a36ee79..a856f8a 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java @@ -109,6 +109,14 @@ public abstract class OozieEntityBuilder<T extends Entity> { public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException; public Properties build(Cluster cluster, Path buildPath, Map<String, String> properties) throws FalconException { + Properties props = new Properties(); + if (properties != null) { + props.putAll(properties); + } + return build(cluster, buildPath, props); + } + + public Properties build(Cluster cluster, Path buildPath, Properties properties) throws FalconException { Properties builderProperties = build(cluster, buildPath); if (properties == null || properties.isEmpty()) { return builderProperties; @@ -130,6 +138,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { return propertiesCopy; } + protected String getStoragePath(Path path) { if (path != null) { return getStoragePath(path.toString()); http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index 181f2d2..562627e 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -37,6 +37,7 @@ import org.apache.falcon.oozie.feed.FSReplicationWorkflowBuilder; import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder; import org.apache.falcon.oozie.feed.HCatReplicationWorkflowBuilder; import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder; +import org.apache.falcon.oozie.process.NativeOozieProcessWorkflowBuilder; import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder; import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; @@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.joda.time.DateTime; import javax.xml.bind.JAXBElement; import javax.xml.namespace.QName; @@ -93,11 +95,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, })); private LifeCycle lifecycle; + private DateTime nominalTime; protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L; protected static final String MR_QUEUE_NAME = "queueName"; protected static final String MR_JOB_PRIORITY = "jobPriority"; + /** + * Represents Scheduler for Entities. + */ + public enum Scheduler { + OOZIE, NATIVE + } + public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) { super(entity); this.lifecycle = lifecycle; @@ -115,7 +125,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend super(entity); } - public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle) + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, + Tag lifecycle) throws FalconException { + return get(entity, cluster, lifecycle, Scheduler.OOZIE); + } + + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle, + Scheduler scheduler) throws FalconException { switch (entity.getEntityType()) { case FEED: @@ -166,6 +182,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend return new PigProcessWorkflowBuilder(process); case OOZIE: + if (Scheduler.NATIVE == scheduler) { + return new NativeOozieProcessWorkflowBuilder(process); + } return new OozieProcessWorkflowBuilder(process); case HIVE: @@ -497,4 +516,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend } return conf; } + + public void setNominalTime(DateTime nominalTime) { + this.nominalTime = nominalTime; + } + + public DateTime getNominalTime() { + return nominalTime; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java new file mode 100644 index 0000000..78e049d --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.process; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Property; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.util.DateUtil; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.hadoop.fs.Path; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + * Workflow Builder for oozie process in case of Native Scheduler. + */ +public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuilder { + + private static final ExpressionHelper EXPRESSION_HELPER = ExpressionHelper.get(); + private static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; + + public NativeOozieProcessWorkflowBuilder(org.apache.falcon.entity.v0.process.Process entity) { + super(entity); + } + + @Override + public java.util.Properties build(Cluster cluster, + Path buildPath, Properties suppliedProps) throws FalconException { + Properties elProps = new Properties(); + DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT); + elProps.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), fmt.print(getNominalTime())); + elProps.put(WorkflowExecutionArgs.TIMESTAMP.getName(), fmt.print(getNominalTime())); + elProps.put(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED.getName(), "true"); + elProps.put(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED.getName(), "false"); //check true or false + + + DateUtil.setTimeZone(entity.getTimezone().getID()); + ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis())); + elProps.putAll(getInputProps(cluster)); + elProps.putAll(getOutputProps()); + elProps.putAll(evalProperties()); + Properties buildProps = build(cluster, buildPath); + buildProps.putAll(elProps); + copyPropsWithoutOverride(buildProps, suppliedProps); + return buildProps; + } + + private void copyPropsWithoutOverride(Properties buildProps, Properties suppliedProps) { + if (suppliedProps == null || suppliedProps.isEmpty()) { + return; + } + for (String propertyName : suppliedProps.stringPropertyNames()) { + if (buildProps.containsKey(propertyName)) { + LOG.warn("User provided property {} is already declared in the entity and will be ignored.", + propertyName); + continue; + } + String propertyValue = suppliedProps.getProperty(propertyName); + buildProps.put(propertyName, propertyValue); + } + } + + private Properties evalProperties() throws FalconException { + Properties props = new Properties(); + org.apache.falcon.entity.v0.process.Properties processProps = entity.getProperties(); + for (Property property : processProps.getProperties()) { + String propName = property.getName(); + String propValue = property.getValue(); + String evalExp = EXPRESSION_HELPER.evaluateFullExpression(propValue, String.class); + props.put(propName, evalExp); + } + return props; + } + + private Properties getOutputProps() throws FalconException { + Properties props = new Properties(); + if (entity.getOutputs() == null) { + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE); + return props; + } + List<String> feedNames = new ArrayList<>(); + List<String> feedInstancePaths= new ArrayList<>(); + for (Output output : entity.getOutputs().getOutputs()) { + Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); + feedNames.add(feed.getName()); + String outputExp = output.getInstance(); + Date outTime = EXPRESSION_HELPER.evaluate(outputExp, Date.class); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = + EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); + if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { + continue; + } + + List<Location> locations = FeedHelper.getLocations(cluster, feed); + for (Location loc : locations) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), outTime); + path = getStoragePath(path); + if (loc.getType() != LocationType.DATA) { + props.put(output.getName() + "." + loc.getType().toString().toLowerCase(), path); + } else { + props.put(output.getName(), path); + } + feedInstancePaths.add(path); + } + } + } + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(feedNames, ",")); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(feedInstancePaths, ",")); + return props; + } + + private Properties getInputProps(Cluster clusterObj) throws FalconException { + Properties props = new Properties(); + + if (entity.getInputs() == null) { + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), NONE); + props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), NONE); + return props; + } + List<String> falconInputFeeds = new ArrayList<>(); + List<String> falconInputNames = new ArrayList<>(); + List<String> falconInputPaths = new ArrayList<>(); + List<String> falconInputFeedStorageTypes = new ArrayList<>(); + for (Input input : entity.getInputs().getInputs()) { + Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); + Storage storage = FeedHelper.createStorage(clusterObj, feed); + if (storage.getType() != Storage.TYPE.FILESYSTEM) { + throw new UnsupportedOperationException("Storage Type not supported " + storage.getType()); + } + falconInputFeeds.add(feed.getName()); + falconInputNames.add(input.getName()); + falconInputFeedStorageTypes.add(storage.getType().name()); + String partition = input.getPartition(); + + String startTimeExp = input.getStart(); + String endTimeExp = input.getEnd(); + ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis())); + Date startTime = EXPRESSION_HELPER.evaluate(startTimeExp, Date.class); + Date endTime = EXPRESSION_HELPER.evaluate(endTimeExp, Date.class); + + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = + EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); + if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { + continue; + } + + List<Location> locations = FeedHelper.getLocations(cluster, feed); + for (Location loc : locations) { + if (loc.getType() != LocationType.DATA) { + continue; + } + List<String> paths = new ArrayList<>(); + List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), + startTime, endTime); // test when startTime and endTime are equal. + for (Date instanceTime : instanceTimes) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); + if (StringUtils.isNotBlank(partition)) { + if (!path.endsWith("/") && !partition.startsWith("/")) { + path = path + "/"; + } + path = path + partition; + } + path = getStoragePath(path); + paths.add(path); + } + if (loc.getType() != LocationType.DATA) { + props.put(input.getName() + "." + loc.getType().toString().toLowerCase(), + StringUtils.join(paths, ",")); + } else { + props.put(input.getName(), StringUtils.join(paths, ",")); + } + falconInputPaths.add(StringUtils.join(paths, ",")); + } + } + } + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(falconInputFeeds, "#")); + props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(falconInputNames, "#")); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(falconInputPaths, "#")); + props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), + StringUtils.join(falconInputFeedStorageTypes, "#")); + return props; + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index 93c894d..a969c7a 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -35,6 +35,7 @@ import org.apache.falcon.state.store.AbstractStateStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -203,10 +204,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC * Schedules an entity. * * @param entity + * @param properties * @throws FalconException */ - public void schedule(Entity entity) throws FalconException { - StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this); + public void schedule(Entity entity, Properties properties) throws FalconException { + StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this, properties); } /** @@ -256,4 +258,14 @@ public final class FalconExecutionService implements FalconService, EntityStateC throw new FalconException("Entity executor for entity cluster key : " + id.getKey() + " does not exist."); } } + + /** + * Schedules an entity. + * + * @param entity + * @throws FalconException + */ + public void schedule(Process entity) throws FalconException { + schedule(entity, null); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 2d666c3..49e1120 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.event.Event; @@ -48,7 +49,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Properties; @@ -61,12 +64,14 @@ import java.util.Properties; public class ProcessExecutionInstance extends ExecutionInstance { private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class); private final Process process; - private List<Predicate> awaitedPredicates = new ArrayList<>(); + private List<Predicate> awaitedPredicates = Collections.synchronizedList(new ArrayList<Predicate>()); private DAGEngine dagEngine = null; - private boolean hasTimedOut = false; + protected boolean hasTimedOut = false; private InstanceID id; private int instanceSequence; + private boolean areDataPredicatesEmpty; private final FalconExecutionService executionService = FalconExecutionService.get(); + private final ExpressionHelper expressionHelper = ExpressionHelper.get(); /** * Constructor. @@ -83,7 +88,7 @@ public class ProcessExecutionInstance extends ExecutionInstance { this.id = new InstanceID(process, cluster, getInstanceTime()); computeInstanceSequence(); dagEngine = DAGEngineFactory.getDAGEngine(cluster); - registerForNotifications(false); + areDataPredicatesEmpty = true; } /** @@ -112,7 +117,7 @@ public class ProcessExecutionInstance extends ExecutionInstance { // Currently, registers for only data notifications to ensure gating conditions are met. // Can be extended to register for other notifications. - private void registerForNotifications(boolean isResume) throws FalconException { + public void registerForNotifications(boolean isResume) throws FalconException { if (process.getInputs() == null) { return; } @@ -122,14 +127,40 @@ public class ProcessExecutionInstance extends ExecutionInstance { continue; } Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); - List<Path> paths = new ArrayList<>(); + String startTimeExp = input.getStart(); + String endTimeExp = input.getEnd(); + DateTime processInstanceTime = getInstanceTime(); + expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis())); + + Date startTime = expressionHelper.evaluate(startTimeExp, Date.class); + Date endTime = expressionHelper.evaluate(endTimeExp, Date.class); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = + EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); + if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { + continue; + } + List<Path> paths = new ArrayList<>(); List<Location> locations = FeedHelper.getLocations(cluster, feed); for (Location loc : locations) { if (loc.getType() != LocationType.DATA) { continue; } - paths.add(new Path(loc.getPath())); + List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), + startTime, endTime); + for (Date instanceTime : instanceTimes) { + String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime); + if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) { + if (!path.endsWith("/")) { + path = path + "/"; + } + path = path + feed.getAvailabilityFlag(); + } + if (!paths.contains(new Path(path))) { + paths.add(new Path(path)); + } + } } Predicate predicate = Predicate.createDataPredicate(paths); @@ -137,21 +168,19 @@ public class ProcessExecutionInstance extends ExecutionInstance { if (isResume && !awaitedPredicates.contains(predicate)) { continue; } - // TODO : Revisit this once the Data Notification Service has been built - // TODO Very IMP : Need to change the polling frequency + addDataPredicate(predicate); DataAvailabilityService.DataRequestBuilder requestBuilder = (DataAvailabilityService.DataRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) .createRequestBuilder(executionService, getId()); requestBuilder.setLocations(paths) .setCluster(cluster.getName()) - .setPollingFrequencyInMillis(100) + .setPollingFrequencyInMillis(SchedulerUtil.getPollingFrequencyinMillis(process.getFrequency())) .setTimeoutInMillis(getTimeOutInMillis()) .setLocations(paths); NotificationServicesRegistry.register(requestBuilder.build()); - LOG.info("Registered for a data notification for process {} for data location {}", - process.getName(), StringUtils.join(paths, ",")); - awaitedPredicates.add(predicate); + LOG.info("Registered for a data notification for process {} of instance time {} for data location {}", + process.getName(), getInstanceTime(), StringUtils.join(paths, ",")); } } } @@ -170,22 +199,26 @@ public class ProcessExecutionInstance extends ExecutionInstance { case DATA_AVAILABLE: // Data has not become available and the wait time has passed if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) { - if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) { - hasTimedOut = true; - } - } else { - // If the event matches any of the awaited predicates, remove the predicate of the awaited list - Predicate toRemove = null; - for (Predicate predicate : awaitedPredicates) { - if (predicate.evaluate(Predicate.getPredicate(event))) { - toRemove = predicate; - break; - } - } - if (toRemove != null) { - awaitedPredicates.remove(toRemove); + hasTimedOut = true; + } + // If the event matches any of the awaited predicates, remove the predicate of the awaited list + Predicate toRemove = null; + synchronized (awaitedPredicates) { + Iterator<Predicate> iterator = awaitedPredicates.iterator(); + while (iterator.hasNext()) { + Predicate predicate = iterator.next(); + if (predicate.evaluate(Predicate.getPredicate(event))) { + toRemove = predicate; + break; } } + if (toRemove != null) { + awaitedPredicates.remove(toRemove); + } + if (awaitedPredicates.size() == 0) { + areDataPredicatesEmpty = true; + } + } break; default: } @@ -200,13 +233,16 @@ public class ProcessExecutionInstance extends ExecutionInstance { if (awaitedPredicates.isEmpty()) { return true; } else { - // If it is waiting to be scheduled, it is in ready. - for (Predicate predicate : awaitedPredicates) { - if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) { - return false; + synchronized (awaitedPredicates) { + Iterator<Predicate> iterator = awaitedPredicates.iterator(); + while (iterator.hasNext()) { + Predicate predicate = iterator.next(); + if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) { + return false; + } } + return true; } - return true; } } @@ -338,4 +374,15 @@ public class ProcessExecutionInstance extends ExecutionInstance { public void rerun() throws FalconException { registerForNotifications(false); } + + public boolean areDataAwaitingPredicatesEmpty() { + return areDataPredicatesEmpty; + } + + protected synchronized void addDataPredicate(Predicate predicate) { + synchronized (awaitedPredicates) { + awaitedPredicates.add(predicate); + areDataPredicatesEmpty = false; + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index 0fc68f0..fec5f31 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.exception.InvalidStateTransitionException; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.DataEvent; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobCompletedEvent; @@ -93,8 +94,9 @@ public class ProcessExecutor extends EntityExecutor { initInstances(); } // Check to handle restart and restoration from state store. - if (STATE_STORE.getEntity(id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) { - dryRun(); + EntityState entityState = STATE_STORE.getEntity(id.getEntityID()); + if (entityState.getCurrentState() != EntityState.STATE.SCHEDULED) { + dryRun(entityState.getProperties()); } else { LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster); LOG.info("Loading instances for process {} from state store.", process.getName()); @@ -103,8 +105,8 @@ public class ProcessExecutor extends EntityExecutor { registerForNotifications(getLastInstanceTime()); } - private void dryRun() throws FalconException { - DAGEngineFactory.getDAGEngine(cluster).submit(process); + private void dryRun(Properties properties) throws FalconException { + DAGEngineFactory.getDAGEngine(cluster).submit(process, properties); } // Initializes the cache of execution instances. Cache is backed by the state store. @@ -419,6 +421,30 @@ public class ProcessExecutor extends EntityExecutor { stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); } break; + case DATA_AVAILABLE: + instance = instances.get((InstanceID)event.getTarget()); + instance.onEvent(event); + switch (((DataEvent) event).getStatus()) { + case AVAILABLE: + if (instance.areDataAwaitingPredicatesEmpty() && !instance.hasTimedOut) { + LOG.info("Data conditions met for instance {} and scheduled for running ", instance.getId()); + stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); + } else if (instance.areDataAwaitingPredicatesEmpty()) { + LOG.info("Instance {} timedout since input data not available", instance.getId()); + stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + } else { + STATE_STORE.updateExecutionInstance(new InstanceState(instance)); + } + break; + case UNAVAILABLE: + if (instance.areDataAwaitingPredicatesEmpty()) { + stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + } + break; + default: + throw new InvalidStateTransitionException("Invalid Data event status."); + } + break; default: if (isTriggerEvent(event)) { instance = buildInstance(event); @@ -473,7 +499,8 @@ public class ProcessExecutor extends EntityExecutor { return event.getTarget().equals(id) || event.getType() == EventType.JOB_COMPLETED || event.getType() == EventType.JOB_SCHEDULED - || event.getType() == EventType.RE_RUN; + || event.getType() == EventType.RE_RUN + || event.getType() == EventType.DATA_AVAILABLE; } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java index 3e7fc9b..236da11 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java @@ -18,6 +18,7 @@ package org.apache.falcon.execution; import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.util.RuntimeProperties; import org.joda.time.DateTime; /** @@ -27,6 +28,14 @@ public final class SchedulerUtil { private static final long MINUTE_IN_MS = 60 * 1000L; private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; + public static final String MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.minutely.process.polling.frequency.millis"; + public static final String HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.hourly.process.polling.frequency.millis"; + public static final String DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.daily.process.polling.frequency.millis"; + public static final String MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS = + "falcon.scheduler.monthly.process.polling.frequency.millis"; private SchedulerUtil(){}; @@ -51,4 +60,29 @@ public final class SchedulerUtil { throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name()); } } + + /** + * + * @param frequency + * @return + */ + public static long getPollingFrequencyinMillis(Frequency frequency) { + switch (frequency.getTimeUnit()) { + case minutes: + return Long.parseLong(RuntimeProperties.get().getProperty(MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "20000")); + case hours: + return Long.parseLong(RuntimeProperties.get().getProperty(HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "60000")); + case days: + return Long.parseLong(RuntimeProperties.get().getProperty(DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "120000")); + case months: + return Long.parseLong(RuntimeProperties.get().getProperty(MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS, + "180000")); + default: + throw new IllegalArgumentException("Unhandled frequency time unit " + frequency.getTimeUnit()); + } + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java index 732da62..1240be9 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java @@ -67,6 +67,9 @@ public class DataAvailabilityService implements FalconNotificationService { public void register(NotificationRequest request) throws NotificationServiceException { LOG.info("Registering Data notification for " + request.getCallbackId().toString()); DataNotificationRequest dataNotificationRequest = (DataNotificationRequest) request; + if (instancesToIgnore.containsKey(dataNotificationRequest.getCallbackId())) { + instancesToIgnore.remove(dataNotificationRequest.getCallbackId()); + } delayQueue.offer(dataNotificationRequest); } @@ -246,7 +249,11 @@ public class DataAvailabilityService implements FalconNotificationService { Map<Path, Boolean> locations) throws IOException { for (Path path : unAvailablePaths) { if (fs.exists(path)) { - locations.put(path, true); + if (locations.containsKey(path)) { + locations.put(path, true); + } else { + locations.put(new Path(path.toUri().getPath()), true); + } } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index 401c57e..a110e64 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -43,6 +43,8 @@ import org.apache.falcon.notification.service.request.JobScheduleNotificationReq import org.apache.falcon.notification.service.request.NotificationRequest; import org.apache.falcon.predicate.Predicate; import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.EntityState; import org.apache.falcon.state.ID; import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; @@ -304,7 +306,9 @@ public class SchedulerService implements FalconNotificationService, Notification DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced); } } else { - externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance); + EntityState entityState = STATE_STORE.getEntity(new EntityID(instance.getEntity())); + externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()) + .run(instance, entityState.getProperties()); } LOG.info("Scheduled job {} for instance {}", externalId, instance.getId()); JobScheduledEvent event = new JobScheduledEvent(instance.getId(), http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java index c7dd5d3..9e2b993 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java @@ -151,6 +151,15 @@ public class DataNotificationRequest extends NotificationRequest implements Dela if (!locations.equals(that.locations)) { return false; } + if (pollingFrequencyInMillis != (that.pollingFrequencyInMillis)) { + return false; + } + if (timeoutInMillis != that.timeoutInMillis) { + return false; + } + if (createdTimeInMillis != that.createdTimeInMillis) { + return false; + } return true; } @@ -158,8 +167,16 @@ public class DataNotificationRequest extends NotificationRequest implements Dela public int hashCode() { int result = cluster.hashCode(); result = 31 * result + (locations != null ? locations.hashCode() : 0); + result = 31 * result + Long.valueOf(pollingFrequencyInMillis).hashCode(); + result = 31 * result + Long.valueOf(timeoutInMillis).hashCode(); + result = 31 * result + Long.valueOf(createdTimeInMillis).hashCode(); return result; } + @Override + public String toString() { + return "cluster: " + this.getCluster() + " locations: " + this.locations + " createdTime: " + + this.createdTimeInMillis; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index c248db6..93dcb12 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -163,6 +163,7 @@ public class Predicate implements Serializable { * @return */ public static Predicate createDataPredicate(List<Path> paths) { + Collections.sort(paths); return new Predicate(TYPE.DATA) .addClause("path", StringUtils.join(paths, ",")); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/EntityState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java index 38479a4..1b26c7a 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java @@ -20,6 +20,8 @@ package org.apache.falcon.state; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.InvalidStateTransitionException; +import java.util.Properties; + /** * Represents the state of a schedulable entity. * Implements {@link org.apache.falcon.state.StateMachine} for an entity. @@ -27,8 +29,17 @@ import org.apache.falcon.exception.InvalidStateTransitionException; public class EntityState implements StateMachine<EntityState.STATE, EntityState.EVENT> { private Entity entity; private STATE currentState; + private Properties properties; private static final STATE INITIAL_STATE = STATE.SUBMITTED; + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + /** * Enumerates all the valid states of a schedulable entity and the valid transitions from that state. */ http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java index b862e4d..f5e6f5b 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -270,7 +270,11 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance @Override public String toString() { - return instance.getId().toString() + "STATE: " + currentState.toString(); + StringBuilder output = new StringBuilder(); + if (instance.getId() != null) { + output.append(instance.getId()); + } + return output.append("STATE").append(currentState.toString()).toString(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/StateService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java index 638bb6e..8a66fb0 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -21,11 +21,14 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.ProcessExecutionInstance; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Properties; + /** * A service that fetches state from state store, handles state transitions of entities and instances, * invokes state change handler and finally persists the new state in the state store. @@ -62,14 +65,18 @@ public final class StateService { * @param handler * @throws FalconException */ - public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) - throws FalconException { + public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler, + Properties props) throws FalconException { EntityID id = new EntityID(entity); if (!stateStore.entityExists(id)) { // New entity if (event == EntityState.EVENT.SUBMIT) { callbackHandler(entity, EntityState.EVENT.SUBMIT, handler); - stateStore.putEntity(new EntityState(entity)); + EntityState entityState = new EntityState(entity); + if (props != null && !props.isEmpty()) { + entityState.setProperties(props); + } + stateStore.putEntity(entityState); LOG.debug("Entity {} submitted due to event {}.", id, event.name()); } else { throw new FalconException("Entity " + id + " does not exist in state store."); @@ -90,6 +97,11 @@ public final class StateService { } } + public void handleStateChange(Entity entity, EntityState.EVENT event, + EntityStateChangeHandler handler) throws FalconException { + handleStateChange(entity, event, handler, null); + } + // Invokes the right method on the state change handler private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler) throws FalconException { @@ -133,6 +145,7 @@ public final class StateService { if (event == InstanceState.EVENT.TRIGGER) { callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler); stateStore.putExecutionInstance(new InstanceState(instance)); + ((ProcessExecutionInstance) instance).registerForNotifications(false); LOG.debug("Instance {} triggered due to event {}.", id, event.name()); } else if (event == InstanceState.EVENT.EXTERNAL_TRIGGER) { callbackHandler(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, handler); http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java index a7deb89..10490e4 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java @@ -45,7 +45,7 @@ public interface EntityStateStore { * @param entityId * @return true, if entity exists in store. */ - boolean entityExists(EntityID entityId) throws StateStoreException;; + boolean entityExists(EntityID entityId) throws StateStoreException; /** * @param state http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 3384186..228b1f9 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -59,7 +59,7 @@ public final class BeanMapperUtil { * @param entityState * @return */ - public static EntityBean convertToEntityBean(EntityState entityState) { + public static EntityBean convertToEntityBean(EntityState entityState) throws IOException { EntityBean entityBean = new EntityBean(); Entity entity = entityState.getEntity(); String id = new EntityID(entity).getKey(); @@ -67,6 +67,10 @@ public final class BeanMapperUtil { entityBean.setName(entity.getName()); entityBean.setState(entityState.getCurrentState().toString()); entityBean.setType(entity.getEntityType().toString()); + if (entityState.getProperties() != null && !entityState.getProperties().isEmpty()) { + byte[] props = getProperties(entityState); + entityBean.setProperties(props); + } return entityBean; } @@ -76,11 +80,26 @@ public final class BeanMapperUtil { * @return * @throws StateStoreException */ - public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException { + public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException, IOException { try { Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName()); EntityState entityState = new EntityState(entity); entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState())); + byte[] result = entityBean.getProperties(); + if (result != null && result.length != 0) { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result); + ObjectInputStream in = null; + Properties properties = null; + try { + in = new ObjectInputStream(byteArrayInputStream); + properties = (Properties) in.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + entityState.setProperties(properties); + } return entityState; } catch (FalconException e) { throw new StateStoreException(e); @@ -94,7 +113,7 @@ public final class BeanMapperUtil { * @throws StateStoreException */ public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans) - throws StateStoreException { + throws StateStoreException, IOException { List<EntityState> entityStates = new ArrayList<>(); if (entityBeans != null && !entityBeans.isEmpty()) { for (EntityBean entityBean : entityBeans) { @@ -306,6 +325,18 @@ public final class BeanMapperUtil { } } + public static byte [] getProperties(EntityState entityState) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(byteArrayOutputStream); + out.writeObject(entityState.getProperties()); + return byteArrayOutputStream.toByteArray(); + } finally { + IOUtils.closeQuietly(out); + } + } + /** * @param summary * @return A map of state and count given the JQL result. http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 38d9217..825fbc1 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -75,11 +75,16 @@ public final class JDBCStateStore extends AbstractStateStore { if (entityExists(entityID)) { throw new StateStoreException("Entity with key, " + key + " already exists."); } - EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState); - EntityManager entityManager = getEntityManager(); - beginTransaction(entityManager); - entityManager.persist(entityBean); - commitAndCloseTransaction(entityManager); + EntityBean entityBean = null; + try { + entityBean = BeanMapperUtil.convertToEntityBean(entityState); + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + entityManager.persist(entityBean); + commitAndCloseTransaction(entityManager); + } catch (IOException e) { + throw new StateStoreException(e); + } } @@ -97,7 +102,11 @@ public final class JDBCStateStore extends AbstractStateStore { if (entityBean == null) { return null; } - return BeanMapperUtil.convertToEntityState(entityBean); + try { + return BeanMapperUtil.convertToEntityState(entityBean); + } catch (IOException e) { + throw new StateStoreException(e); + } } private EntityBean getEntityBean(EntityID id) { @@ -133,7 +142,11 @@ public final class JDBCStateStore extends AbstractStateStore { Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ENTITIES); List result = q.getResultList(); entityManager.close(); - return BeanMapperUtil.convertToEntityState(result); + try { + return BeanMapperUtil.convertToEntityState(result); + } catch (IOException e) { + throw new StateStoreException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java index 49e083c..29b3bbb 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java @@ -35,10 +35,11 @@ public interface DAGEngine { * Run an instance for execution. * * @param instance + * @param props * @return * @throws DAGEngineException */ - String run(ExecutionInstance instance) throws DAGEngineException; + String run(ExecutionInstance instance, Properties props) throws DAGEngineException; /** * @param instance @@ -85,9 +86,10 @@ public interface DAGEngine { * Perform dryrun of an instance. * * @param entity + * @param props * @throws DAGEngineException */ - void submit(Entity entity) throws DAGEngineException; + void submit(Entity entity, Properties props) throws DAGEngineException; /** * Returns info about the Job. http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index c6d4212..6dbec0c 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -68,6 +68,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters"; public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun"; public static final String FALCON_RERUN = "falcon.system.rerun"; + public static final String FALCON_SKIP_DRYRUN = "falcon.system.skip.dryrun"; public static final String FALCON_RESUME = "falcon.system.resume"; private enum JobAction { @@ -85,13 +86,24 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { } @Override - public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException { - EXECUTION_SERVICE.schedule(entity); + public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException { + Properties props = new Properties(); + if (suppliedProps != null && !suppliedProps.isEmpty()) { + props.putAll(suppliedProps); + } + if (skipDryRun) { + props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true"); + } + EXECUTION_SERVICE.schedule(entity, props); } @Override public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException { - DAGEngineFactory.getDAGEngine(clusterName).submit(entity); + Properties props = new Properties(); + if (skipDryRun) { + props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true"); + } + DAGEngineFactory.getDAGEngine(clusterName).submit(entity, props); } @Override
