FALCON-1213 Base framework of the native scheduler
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4175c54a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4175c54a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4175c54a Branch: refs/heads/master Commit: 4175c54a158eeb9883dc192260890eb2b73ad6f1 Parents: 5a55bae Author: Pallavi Rao <[email protected]> Authored: Tue Oct 20 17:38:26 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Oct 20 17:38:26 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/EntityUtil.java | 34 ++ pom.xml | 6 +- scheduler/pom.xml | 120 ++++ .../falcon/exception/DAGEngineException.java | 48 ++ .../InvalidStateTransitionException.java | 47 ++ .../exception/NotificationServiceException.java | 48 ++ .../falcon/exception/StateStoreException.java | 47 ++ .../apache/falcon/execution/EntityExecutor.java | 111 ++++ .../falcon/execution/ExecutionInstance.java | 180 ++++++ .../execution/FalconExecutionService.java | 214 +++++++ .../falcon/execution/NotificationHandler.java | 34 ++ .../execution/ProcessExecutionInstance.java | 277 +++++++++ .../falcon/execution/ProcessExecutor.java | 460 +++++++++++++++ .../apache/falcon/execution/SchedulerUtil.java | 54 ++ .../service/FalconNotificationService.java | 76 +++ .../service/NotificationServicesRegistry.java | 125 +++++ .../notification/service/event/DataEvent.java | 76 +++ .../notification/service/event/Event.java | 37 ++ .../service/event/JobCompletedEvent.java | 58 ++ .../service/event/JobScheduledEvent.java | 80 +++ .../service/event/TimeElapsedEvent.java | 62 +++ .../notification/service/impl/AlarmService.java | 326 +++++++++++ .../service/impl/DataAvailabilityService.java | 94 ++++ .../service/impl/JobCompletionService.java | 208 +++++++ .../service/impl/SchedulerService.java | 399 +++++++++++++ .../service/request/AlarmRequest.java | 84 +++ .../request/DataNotificationRequest.java | 79 +++ .../JobCompletionNotificationRequest.java | 62 +++ .../request/JobScheduleNotificationRequest.java | 60 ++ .../service/request/NotificationRequest.java | 53 ++ .../org/apache/falcon/predicate/Predicate.java | 220 ++++++++ .../org/apache/falcon/state/EntityState.java | 133 +++++ .../falcon/state/EntityStateChangeHandler.java | 59 ++ .../main/java/org/apache/falcon/state/ID.java | 200 +++++++ .../org/apache/falcon/state/InstanceState.java | 250 +++++++++ .../state/InstanceStateChangeHandler.java | 99 ++++ .../org/apache/falcon/state/StateMachine.java | 34 ++ .../org/apache/falcon/state/StateService.java | 185 ++++++ .../falcon/state/store/AbstractStateStore.java | 92 +++ .../falcon/state/store/EntityStateStore.java | 76 +++ .../falcon/state/store/InMemoryStateStore.java | 227 ++++++++ .../falcon/state/store/InstanceStateStore.java | 113 ++++ .../apache/falcon/state/store/StateStore.java | 27 + .../falcon/workflow/engine/DAGEngine.java | 115 ++++ .../workflow/engine/DAGEngineFactory.java | 53 ++ .../workflow/engine/FalconWorkflowEngine.java | 366 ++++++++++++ .../falcon/workflow/engine/OozieDAGEngine.java | 401 +++++++++++++ .../execution/FalconExecutionServiceTest.java | 557 +++++++++++++++++++ .../apache/falcon/execution/MockDAGEngine.java | 122 ++++ .../falcon/execution/SchedulerUtilTest.java | 50 ++ .../notification/service/AlarmServiceTest.java | 77 +++ .../service/SchedulerServiceTest.java | 314 +++++++++++ .../apache/falcon/predicate/PredicateTest.java | 53 ++ .../falcon/state/EntityStateServiceTest.java | 119 ++++ .../falcon/state/InstanceStateServiceTest.java | 138 +++++ .../resources/config/cluster/cluster-0.1.xml | 43 ++ .../src/test/resources/config/feed/feed-0.1.xml | 57 ++ .../resources/config/process/process-0.1.xml | 54 ++ webapp/pom.xml | 6 + 60 files changed, 7800 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bc4fdf5..a4dc1c8 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (Unreleased) FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) NEW FEATURES + FALCON-1213 Base framework of the native scheduler(Pallavi Rao) + FALCON-1315 Update falcon ui for HiveDR, secure clusters and bug fixes(Armando Reyna/Venkat Ranganathan via Sowmya Ramesh) FALCON-1102 Gather data transfer details of filesystem replication(Peeyush Bishnoi via Sowmya Ramesh) http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 3ab9339..ceefb17 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -85,10 +85,32 @@ public final class EntityUtil { private static final long DAY_IN_MS = 24 * HOUR_IN_MS; private static final long MONTH_IN_MS = 31 * DAY_IN_MS; private static final long ONE_MS = 1; + public static final String MR_JOB_PRIORITY = "jobPriority"; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; private static final String STAGING_DIR_NAME_SEPARATOR = "_"; + /** Priority with which the DAG will be scheduled. + * Matches the five priorities of Hadoop jobs. + */ + public enum JOBPRIORITY { + VERY_HIGH((short) 1), + HIGH((short) 2), + NORMAL((short) 3), + LOW((short) 4), + VERY_LOW((short) 5); + + private short priority; + + public short getPriority() { + return priority; + } + + JOBPRIORITY(short priority) { + this.priority = priority; + } + } + private EntityUtil() {} public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException { @@ -1015,4 +1037,16 @@ public final class EntityUtil { } return props; } + + public static JOBPRIORITY getPriority(Process process) { + org.apache.falcon.entity.v0.process.Properties processProps = process.getProperties(); + if (processProps != null) { + for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) { + if (prop.getName().equals(MR_JOB_PRIORITY)) { + return JOBPRIORITY.valueOf(prop.getValue()); + } + } + } + return JOBPRIORITY.NORMAL; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 54e6cd1..8cd3c3c 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,9 @@ <hive.version>0.13.1</hive.version> <jetty.version>6.1.26</jetty.version> <jersey.version>1.9</jersey.version> + <quartz.version>2.2.1</quartz.version> + <joda.version>2.8.2</joda.version> + <mockito.version>1.9.5</mockito.version> <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo> <excluded.test.groups>exhaustive</excluded.test.groups> </properties> @@ -427,6 +430,7 @@ <module>messaging</module> <module>oozie-el-extensions</module> <module>oozie</module> + <module>scheduler</module> <module>acquisition</module> <module>replication</module> <module>retention</module> @@ -680,7 +684,7 @@ <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> - <version>1.8.5</version> + <version>${mockito.version}</version> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/pom.xml ---------------------------------------------------------------------- diff --git a/scheduler/pom.xml b/scheduler/pom.xml new file mode 100644 index 0000000..dddfcce --- /dev/null +++ b/scheduler/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-main</artifactId> + <version>0.8-SNAPSHOT</version> + </parent> + <artifactId>falcon-scheduler</artifactId> + <description>Apache Falcon Scheduler Module</description> + <name>Apache Falcon Scheduler</name> + <packaging>jar</packaging> + + <profiles> + <profile> + <id>hadoop-2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + + <dependencies> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-oozie-adaptor</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-messaging</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-test-util</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-client</artifactId> + </dependency> + + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>${quartz.version}</version> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>${joda.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java b/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java new file mode 100644 index 0000000..8b5bb64 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.exception; + +import org.apache.falcon.FalconException; + +/** + * Exception thrown by DAG Execution Engine. + */ +public class DAGEngineException extends FalconException { + + /** + * @param e + */ + public DAGEngineException(Throwable e) { + super(e); + } + + /** + * @param message - custom message + * @param e + */ + public DAGEngineException(String message, Throwable e) { + super(message, e); + } + + /** + * @param message - custom message + */ + public DAGEngineException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java b/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java new file mode 100644 index 0000000..19284a5 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.exception; + +import org.apache.falcon.FalconException; + +/** + * Exception thrown during state transition of entities and instances. + */ +public class InvalidStateTransitionException extends FalconException { + /** + * @param e Exception + */ + public InvalidStateTransitionException(Throwable e) { + super(e); + } + + /** + * @param message - custom exception message + * @param e + */ + public InvalidStateTransitionException(String message, Throwable e) { + super(message, e); + } + + /** + * @param message - custom exception message + */ + public InvalidStateTransitionException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java b/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java new file mode 100644 index 0000000..b7f84df --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.exception; + +import org.apache.falcon.FalconException; + +/** + * Exception thrown by notification services. + */ +public class NotificationServiceException extends FalconException { + + /** + * @param e + */ + public NotificationServiceException(Throwable e) { + super(e); + } + + /** + * @param message - custom message + * @param e + */ + public NotificationServiceException(String message, Throwable e) { + super(message, e); + } + + /** + * @param message - custom message + */ + public NotificationServiceException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java b/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java new file mode 100644 index 0000000..93bdad3 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.exception; + +import org.apache.falcon.FalconException; + +/** + * Exception thrown by the State store API. + */ +public class StateStoreException extends FalconException { + /** + * @param e + */ + public StateStoreException(Throwable e) { + super(e); + } + + /** + * @param message - custom message + * @param e + */ + public StateStoreException(String message, Throwable e) { + super(message, e); + } + + /** + * @param message - custom message + */ + public StateStoreException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java new file mode 100644 index 0000000..9b07b9e --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceStateChangeHandler; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.StateStore; + +/** + * This class is responsible for creation of execution instances for a given entity. + * An execution instance is created upon receipt of a "trigger event". + * It also handles the state transition of each execution instance. + * This class is also responsible for handling user interrupts for an entity such as suspend, kill etc. + */ +public abstract class EntityExecutor implements NotificationHandler, InstanceStateChangeHandler { + protected static final ConfigurationStore STORE = ConfigurationStore.get(); + // The number of execution instances to be cached by default + public static final String DEFAULT_CACHE_SIZE = "20"; + protected String cluster; + protected static final StateStore STATE_STORE = AbstractStateStore.get(); + protected ID id; + + /** + * Schedules execution instances for the entity. Idempotent operation. + * + * @throws FalconException + */ + public abstract void schedule() throws FalconException; + + /** + * Suspends all "active" execution instances of the entity. Idempotent operation. + * The operation can fail on certain instances. In such cases, the operation is partially successful. + * + * @throws FalconException - When the operation on an instance fails + */ + public abstract void suspendAll() throws FalconException; + + /** + * Resumes all suspended execution instances of the entity. Idempotent operation. + * The operation can fail on certain instances. In such cases, the operation is partially successful. + * + * @throws FalconException - When the operation on an instance fails + */ + public abstract void resumeAll() throws FalconException; + + /** + * Deletes all execution instances of an entity, even from the store. Idempotent operation. + * The operation can fail on certain instances. In such cases, the operation is partially successful. + * + * @throws FalconException - When the operation on an instance fails + */ + public abstract void killAll() throws FalconException; + + /** + * Suspends a specified set of execution instances. Idempotent operation. + * The operation can fail on certain instances. In such cases, the operation is partially successful. + * + * @param instance + * @throws FalconException + */ + public abstract void suspend(ExecutionInstance instance) throws FalconException; + + /** + * Resumes a specified set of execution instances. Idempotent operation. + * The operation can fail on certain instances. In such cases, the operation is partially successful. + * + * @param instance + * @throws FalconException + */ + public abstract void resume(ExecutionInstance instance) throws FalconException; + + /** + * Kills a specified set of execution instances. Idempotent operation. + * The operation can fail on certain instances. In such cases, the operation is partially successful. + * + * @param instance + * @throws FalconException + */ + public abstract void kill(ExecutionInstance instance) throws FalconException; + + /** + * @return The entity + */ + public abstract Entity getEntity(); + + /** + * @return ID of the entity + */ + public ID getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java new file mode 100644 index 0000000..3869ff2 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.execution; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.ID; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import java.util.List; +import java.util.TimeZone; + +/** + * Represents an execution instance of an entity. + */ +public abstract class ExecutionInstance implements NotificationHandler { + + // TODO : Add more fields + private final String cluster; + // External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine. + // For example, for Oozie this would be the workflow Id. + private String externalID; + private final DateTime instanceTime; + private final DateTime creationTime; + private DateTime actualStart; + private DateTime actualEnd; + private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC")); + + /** + * @param instanceTime + * @param cluster + */ + public ExecutionInstance(DateTime instanceTime, String cluster) { + this.instanceTime = new DateTime(instanceTime, UTC); + this.cluster = cluster; + this.creationTime = DateTime.now(UTC); + } + + /** + * For a-periodic instances. + * @param cluster + */ + public ExecutionInstance(String cluster) { + this.instanceTime = DateTime.now(); + this.cluster = cluster; + this.creationTime = DateTime.now(UTC); + } + + /** + * @return - The external id corresponding to this instance. + * If the instance is executed on Oozie, externalID will the Oozie workflow ID. + */ + public String getExternalID() { + return externalID; + } + + /** + * Setter for external ID, Oozie workflow ID, for example. + * + * @param jobID + */ + public void setExternalID(String jobID) { + this.externalID = jobID; + } + + /** + * @return The unique ID of this instance. The instance is referred using this ID inside the system. + */ + public abstract ID getId(); + + /** + * @return - The entity to which this instance belongs. + */ + public abstract Entity getEntity(); + + /** + * @return - The nominal time of the instance. + */ + public DateTime getInstanceTime() { + return instanceTime; + } + + /** + * @return - The name of the cluster on which this instance is running + */ + public String getCluster() { + return cluster; + } + + /** + * @return - The sequential numerical id of the instance + */ + public abstract int getInstanceSequence(); + + /** + * @return - Actual start time of instance. + */ + public DateTime getActualStart() { + return actualStart; + } + + /** + * @param actualStart + */ + public void setActualStart(DateTime actualStart) { + this.actualStart = actualStart; + } + + /** + * @return - Completion time of the instance + */ + public DateTime getActualEnd() { + return actualEnd; + } + + /** + * @param actualEnd + */ + public void setActualEnd(DateTime actualEnd) { + this.actualEnd = actualEnd; + } + + + public DateTime getCreationTime() { + return creationTime; + } + + /** + * @return - The gating conditions on which this instance is waiting before it is scheduled for execution. + * @throws FalconException + */ + public abstract List<Predicate> getAwaitingPredicates() throws FalconException; + + /** + * Suspends the instance if it is in one of the active states, waiting, ready or running. + * + * @throws FalconException + */ + public abstract void suspend() throws FalconException; + + /** + * Resumes a previously suspended instance. + * + * @throws FalconException + */ + public abstract void resume() throws FalconException; + + /** + * Kills an instance if it is in one of the active states, waiting, ready or running. + * + * @throws FalconException + */ + public abstract void kill() throws FalconException; + + /** + * Handles any clean up and de-registration of notification subscriptions. + * Invoked when the instance reaches one of its terminal states. + * + * @throws FalconException + */ + public abstract void destroy() throws FalconException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java new file mode 100644 index 0000000..b959320 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.execution; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.service.FalconService; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.EntityStateChangeHandler; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.StateService; +import org.apache.falcon.state.store.AbstractStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This singleton is the entry point for all callbacks from the notification services. + * The execution service handles any system level events that apply to all entities. + * It is responsible for creation of entity executors one per entity, per cluster. + */ +public final class FalconExecutionService implements FalconService, EntityStateChangeHandler, NotificationHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FalconExecutionService.class); + + // Stores all entity executors in memory + private ConcurrentMap<ID, EntityExecutor> executors = new ConcurrentHashMap<ID, EntityExecutor>(); + + private static FalconExecutionService executionService = new FalconExecutionService(); + + @Override + public String getName() { + return "FalconExecutionService"; + } + + public void init() { + LOG.debug("State store instance being used : {}", AbstractStateStore.get()); + // Initialize all executors from store + for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) { + try { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityExecutor executor = createEntityExecutor(entity, cluster); + executors.put(new ID(entity, cluster), executor); + executor.schedule(); + } + } catch (FalconException e) { + LOG.error("Unable to load entity : " + entity.getName(), e); + throw new RuntimeException(e); + } + } + // TODO : During migration, the state store itself may not have been completely bootstrapped. + } + + /** + * Returns an EntityExecutor implementation based on the entity type. + * + * @param entity + * @param cluster + * @return + * @throws FalconException + */ + private EntityExecutor createEntityExecutor(Entity entity, String cluster) throws FalconException { + switch (entity.getEntityType()) { + case FEED: + throw new UnsupportedOperationException("No support yet for feed."); + case PROCESS: + return new ProcessExecutor(((Process)entity), cluster); + default: + throw new IllegalArgumentException("Unhandled type " + entity.getEntityType().name()); + } + } + + @Override + public void destroy() throws FalconException { + + } + + /** + * @return - An instance(singleton) of FalconExecutionService + */ + public static FalconExecutionService get() { + return executionService; + } + + private FalconExecutionService() {} + + @Override + public void onEvent(Event event) throws FalconException { + // Currently, simply passes along the event to the appropriate executor + EntityExecutor executor = executors.get(event.getTarget().getEntityID()); + if (executor == null) { + // The executor has gone away, throw an exception so the notification service knows + throw new FalconException("Target executor for " + event.getTarget().getEntityID() + " does not exist."); + } + executor.onEvent(event); + } + + @Override + public void onSubmit(Entity entity) throws FalconException { + // Do nothing + } + + @Override + public void onSchedule(Entity entity) throws FalconException { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityExecutor executor = createEntityExecutor(entity, cluster); + ID id = new ID(entity, cluster); + executors.put(id, executor); + LOG.info("Scheduling entity {}.", id); + executor.schedule(); + } + } + + @Override + public void onSuspend(Entity entity) throws FalconException { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityExecutor executor = getEntityExecutor(entity, cluster); + executor.suspendAll(); + } + } + + @Override + public void onResume(Entity entity) throws FalconException { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityExecutor executor = createEntityExecutor(entity, cluster); + executors.put(new ID(entity, cluster), executor); + LOG.info("Resuming entity, {} of type {} on cluster {}.", entity.getName(), + entity.getEntityType(), cluster); + executor.resumeAll(); + } + } + + /** + * Schedules an entity. + * + * @param entity + * @throws FalconException + */ + public void schedule(Entity entity) throws FalconException { + StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this); + } + + /** + * Suspends an entity. + * + * @param entity + * @throws FalconException + */ + public void suspend(Entity entity) throws FalconException { + StateService.get().handleStateChange(entity, EntityState.EVENT.SUSPEND, this); + } + + /** + * Resumes an entity. + * + * @param entity + * @throws FalconException + */ + public void resume(Entity entity) throws FalconException { + StateService.get().handleStateChange(entity, EntityState.EVENT.RESUME, this); + } + + /** + * Deletes an entity from the execution service. + * + * @param entity + * @throws FalconException + */ + public void delete(Entity entity) throws FalconException { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityExecutor executor = getEntityExecutor(entity, cluster); + executor.killAll(); + executors.remove(executor.getId()); + } + } + + /** + * Returns the instance of {@link EntityExecutor} for a given entity and cluster. + * + * @param entity + * @param cluster + * @return + * @throws FalconException + */ + public EntityExecutor getEntityExecutor(Entity entity, String cluster) throws FalconException { + ID id = new ID(entity, cluster); + if (executors.containsKey(id)) { + return executors.get(id); + } else { + throw new FalconException("Entity executor for entity : " + id.getEntityKey() + " does not exist."); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java new file mode 100644 index 0000000..b071f5f --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.FalconException; +import org.apache.falcon.notification.service.event.Event; + +/** + * An interface that every class that handles notifications from notification services must implement. + */ +public interface NotificationHandler { + /** + * The method a notification service calls to onEvent an event. + * + * @param event + * @throws FalconException + */ + void onEvent(Event event) throws FalconException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java new file mode 100644 index 0000000..19089c4 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.DataEvent; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.JobCompletedEvent; +import org.apache.falcon.notification.service.event.JobScheduledEvent; +import org.apache.falcon.notification.service.impl.DataAvailabilityService; +import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.ID; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.workflow.engine.DAGEngine; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.hadoop.fs.Path; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * Represents an execution instance of a process. + * Responsible for user actions such as suspend, resume, kill on individual instances. + */ + +public class ProcessExecutionInstance extends ExecutionInstance { + private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class); + private final Process process; + private List<Predicate> awaitedPredicates = new ArrayList<Predicate>(); + private DAGEngine dagEngine = null; + private boolean hasTimedOut = false; + private ID id; + private int instanceSequence; + private final FalconExecutionService executionService = FalconExecutionService.get(); + + /** + * Constructor. + * + * @param process + * @param instanceTime + * @param cluster + * @throws FalconException + */ + public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException { + super(instanceTime, cluster); + this.process = process; + this.id = new ID(process, cluster, getInstanceTime()); + computeInstanceSequence(); + dagEngine = DAGEngineFactory.getDAGEngine(cluster); + registerForNotifications(false); + } + + // Computes the instance number based on the nominal time. + // Method can be extended to assign instance numbers for non-time based instances. + private void computeInstanceSequence() { + for (Cluster processCluster : process.getClusters().getClusters()) { + if (processCluster.getName().equals(getCluster())) { + Date start = processCluster.getValidity().getStart(); + instanceSequence = EntityUtil.getInstanceSequence(start, process.getFrequency(), + process.getTimezone(), getInstanceTime().toDate()); + break; + } + } + } + + // Currently, registers for only data notifications to ensure gating conditions are met. + // Can be extended to register for other notifications. + private void registerForNotifications(boolean isResume) throws FalconException { + if (process.getInputs() == null) { + return; + } + for (Input input : process.getInputs().getInputs()) { + // Register for notification for every required input + if (input.isOptional()) { + continue; + } + Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { + List<Location> locations = FeedHelper.getLocations(cluster, feed); + for (Location loc : locations) { + if (loc.getType() != LocationType.DATA) { + continue; + } + + Predicate predicate = Predicate.createDataPredicate(loc); + // To ensure we evaluate only predicates not evaluated before when an instance is resumed. + if (isResume && !awaitedPredicates.contains(predicate)) { + continue; + } + // TODO : Revisit this once the Data Availability Service has been built + DataAvailabilityService.DataRequestBuilder requestBuilder = + (DataAvailabilityService.DataRequestBuilder) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA) + .createRequestBuilder(executionService, getId()); + requestBuilder.setDataLocation(new Path(loc.getPath())); + NotificationServicesRegistry.register(requestBuilder.build()); + LOG.info("Registered for a data notification for process {} for data location {}", + process.getName(), loc.getPath()); + awaitedPredicates.add(predicate); + } + } + } + } + + @Override + public void onEvent(Event event) throws FalconException { + switch (event.getSource()) { + case JOB_SCHEDULE: + JobScheduledEvent jobScheduleEvent = (JobScheduledEvent) event; + setExternalID(jobScheduleEvent.getExternalID()); + setActualStart(jobScheduleEvent.getStartTime()); + break; + case JOB_COMPLETION: + setActualEnd(((JobCompletedEvent)event).getEndTime()); + break; + case DATA: + // Data has not become available and the wait time has passed + if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) { + if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) { + hasTimedOut = true; + } + } else { + // If the event matches any of the awaited predicates, remove the predicate of the awaited list + Predicate toRemove = null; + for (Predicate predicate : awaitedPredicates) { + if (predicate.evaluate(Predicate.getPredicate(event))) { + toRemove = predicate; + break; + } + } + if (toRemove != null) { + awaitedPredicates.remove(toRemove); + } + } + break; + default: + } + } + + /** + * Is the instance ready to be scheduled? + * + * @return true when it is not already scheduled or is gated on some conditions. + */ + public boolean isReady() { + if (getExternalID() != null) { + return false; + } + if (awaitedPredicates.isEmpty()) { + return true; + } else { + // If it is waiting to be scheduled, it is in ready. + for (Predicate predicate : awaitedPredicates) { + if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) { + return false; + } + } + return true; + } + } + + /** + * Is the instance scheduled for execution? + * + * @return - true if it is scheduled and has not yet completed. + * @throws FalconException + */ + public boolean isScheduled() throws FalconException { + return getExternalID() != null && dagEngine.isScheduled(this); + } + + /** + * Has the instance timed out waiting for gating conditions to be met? + * + * @return + */ + public boolean hasTimedout() { + return hasTimedOut || (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())); + } + + @Override + public ID getId() { + return id; + } + + @Override + public Entity getEntity() { + return process; + } + + @Override + public int getInstanceSequence() { + return instanceSequence; + } + + @Override + public List<Predicate> getAwaitingPredicates() throws FalconException { + return awaitedPredicates; + } + + @Override + public void suspend() throws FalconException { + if (getExternalID() != null) { + dagEngine.suspend(this); + } + destroy(); + } + + @Override + public void resume() throws FalconException { + // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended + if (getExternalID() != null) { + dagEngine.resume(this); + } else if (awaitedPredicates.size() != 0) { + // Evaluate any remaining predicates + registerForNotifications(true); + } + } + + @Override + public void kill() throws FalconException { + if (getExternalID() != null) { + dagEngine.kill(this); + } + destroy(); + } + + // If timeout specified in process, uses it. + // Else, defaults to frequency of the entity * timeoutFactor + private long getTimeOutInMillis() { + if (process.getTimeout() == null) { + // Default timeout is the frequency of the entity + int timeoutFactor = Integer.parseInt(RuntimeProperties.get().getProperty("instance.timeout.factor", + "1")); + return SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * timeoutFactor; + } else { + // TODO : Should timeout = 0 have a special meaning or should it be disallowed? + return SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getTimeout()); + } + } + + @Override + public void destroy() throws FalconException { + NotificationServicesRegistry.unregister(executionService, getId()); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java new file mode 100644 index 0000000..68c34e7 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.exception.InvalidStateTransitionException; +import org.apache.falcon.notification.service.NotificationServicesRegistry; +import org.apache.falcon.notification.service.event.Event; +import org.apache.falcon.notification.service.event.JobCompletedEvent; +import org.apache.falcon.notification.service.event.TimeElapsedEvent; +import org.apache.falcon.notification.service.impl.JobCompletionService; +import org.apache.falcon.notification.service.impl.SchedulerService; +import org.apache.falcon.notification.service.impl.AlarmService; +import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.StateService; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Date; +import java.util.TimeZone; + +/** + * This class is responsible for managing execution instances of a process. + * It caches the active process instances in memory and handles notification events. + * It intercepts all the notification events intended for its instances and passes them along to the instance after + * acting on it, where applicable. + */ +public class ProcessExecutor extends EntityExecutor { + private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class); + protected LoadingCache<ID, ProcessExecutionInstance> instances; + private Predicate triggerPredicate; + private final Process process; + private final StateService stateService = StateService.get(); + private final FalconExecutionService executionService = FalconExecutionService.get(); + + /** + * Constructor per entity, per cluster. + * + * @param proc + * @param clusterName + * @throws FalconException + */ + public ProcessExecutor(Process proc, String clusterName) throws FalconException { + process = proc; + cluster = clusterName; + id = new ID(proc, clusterName); + } + + @Override + public void schedule() throws FalconException { + // Lazy instantiation + if (instances == null) { + initInstances(); + } + // Check to handle restart and restoration from state store. + if (STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SCHEDULED) { + dryRun(); + } else { + LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster); + LOG.info("Loading instances for process {} from state store.", process.getName()); + reloadInstances(); + } + registerForNotifications(); + } + + private void dryRun() throws FalconException { + DAGEngineFactory.getDAGEngine(cluster).submit(process); + } + + // Initializes the cache of execution instances. Cache is backed by the state store. + private void initInstances() throws FalconException { + int cacheSize = Integer.parseInt(StartupProperties.get().getProperty("scheduler.instance.cache.size", + DEFAULT_CACHE_SIZE)); + + instances = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + .build(new CacheLoader<ID, ProcessExecutionInstance>() { + @Override + public ProcessExecutionInstance load(ID id) throws Exception { + return (ProcessExecutionInstance) STATE_STORE.getExecutionInstance(id).getInstance(); + } + }); + } + + // Re-load any active instances from state + private void reloadInstances() throws FalconException { + for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, + InstanceState.getActiveStates())) { + ExecutionInstance instance = instanceState.getInstance(); + LOG.debug("Loading instance {} from state.", instance.getId()); + switch (instanceState.getCurrentState()) { + case RUNNING: + onSchedule(instance); + break; + case READY: + onConditionsMet(instance); + break; + case WAITING: + instance.resume(); + break; + default: // skip + } + instances.put(instance.getId(), (ProcessExecutionInstance) instance); + } + } + + @Override + public void suspendAll() throws FalconException { + NotificationServicesRegistry.unregister(executionService, getId()); + StringBuffer errMsg = new StringBuffer(); + // Only active instances are in memory. Suspend them first. + for (ExecutionInstance instance : instances.asMap().values()) { + try { + suspend(instance); + } catch (FalconException e) { + // Proceed with next + errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage()); + LOG.error("Instance suspend failed for : " + instance.getId(), e); + } + } + for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, + InstanceState.getActiveStates())) { + ExecutionInstance instance = instanceState.getInstance(); + try { + suspend(instance); + } catch (FalconException e) { + errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage()); + LOG.error("Instance suspend failed for : " + instance.getId(), e); + } + } + // Some errors + if (errMsg.length() != 0) { + throw new FalconException("Some instances failed to suspend : " + errMsg.toString()); + } + } + + @Override + public void resumeAll() throws FalconException { + if (instances == null) { + initInstances(); + } + StringBuffer errMsg = new StringBuffer(); + ArrayList<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>(); + // TODO : Distinguish between individually suspended instance versus suspended entity? + states.add(InstanceState.STATE.SUSPENDED); + // Load cache with suspended instances + for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, states)) { + ExecutionInstance instance = instanceState.getInstance(); + try { + resume(instance); + } catch (FalconException e) { + errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage()); + LOG.error("Instance suspend failed for : " + instance.getId(), e); + } + } + registerForNotifications(); + // Some errors + if (errMsg.length() != 0) { + throw new FalconException("Some instances failed to resume : " + errMsg.toString()); + } + } + + @Override + public void killAll() throws FalconException { + NotificationServicesRegistry.unregister(executionService, getId()); + StringBuffer errMsg = new StringBuffer(); + // Only active instances are in memory. Kill them first. + for (ExecutionInstance instance : instances.asMap().values()) { + try { + kill(instance); + } catch (FalconException e) { + // Proceed with next + errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage()); + LOG.error("Instance kill failed for : " + instance.getId(), e); + } + } + for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, + InstanceState.getActiveStates())) { + ExecutionInstance instance = instanceState.getInstance(); + try { + kill(instance); + } catch (FalconException e) { + errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage()); + LOG.error("Instance kill failed for : " + instance.getId(), e); + } + } + // Some errors + if (errMsg.length() != 0) { + throw new FalconException("Some instances failed to kill : " + errMsg.toString()); + } + } + + @Override + public void suspend(ExecutionInstance instance) throws FalconException { + try { + instance.suspend(); + stateService.handleStateChange(instance, InstanceState.EVENT.SUSPEND, this); + } catch (Exception e) { + LOG.error("Suspend failed for instance id : " + instance.getId(), e); + throw new FalconException("Suspend failed for instance : " + instance.getId(), e); + } + + } + + @Override + public void resume(ExecutionInstance instance) throws FalconException { + + try { + instance.resume(); + if (((ProcessExecutionInstance) instance).isScheduled()) { + stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_RUNNING, this); + onSchedule(instance); + } else if (((ProcessExecutionInstance) instance).isReady()) { + stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_READY, this); + onConditionsMet(instance); + } else { + stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_WAITING, this); + } + } catch (Exception e) { + LOG.error("Resume failed for instance id : " + instance.getId(), e); + throw new FalconException("Resume failed for instance : " + instance.getId(), e); + } + } + + @Override + public void kill(ExecutionInstance instance) throws FalconException { + try { + // Kill will de-register from notification services + instance.kill(); + stateService.handleStateChange(instance, InstanceState.EVENT.KILL, this); + } catch (Exception e) { + LOG.error("Kill failed for instance id : " + instance.getId(), e); + throw new FalconException("Kill failed for instance : " + instance.getId(), e); + } + } + + @Override + public Entity getEntity() { + return process; + } + + private ProcessExecutionInstance buildInstance(Event event) throws FalconException { + // If a time triggered instance, use nominal time from event + if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) { + TimeElapsedEvent timeEvent = (TimeElapsedEvent) event; + LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime()); + return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster); + } else { + return new ProcessExecutionInstance(process, DateTime.now(), cluster); + } + } + + @Override + public void onEvent(Event event) throws FalconException { + try { + // Handle event if applicable + if (shouldHandleEvent(event)) { + handleEvent(event); + } else { + // Else, pass it along to the execution instance + ProcessExecutionInstance instance = instances.get(event.getTarget()); + if (instance != null) { + instance.onEvent(event); + if (instance.isReady()) { + stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); + } else if (instance.hasTimedout()) { + stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this); + } + } + } + } catch (Exception e) { + throw new FalconException("Unable to handle event with source : " + event.getSource() + " with target:" + + event.getTarget(), e); + } + } + + private void handleEvent(Event event) throws FalconException { + ProcessExecutionInstance instance; + try { + switch (event.getSource()) { + // TODO : Handle cases where scheduling fails. + case JOB_SCHEDULE: + instance = instances.get(event.getTarget()); + instance.onEvent(event); + stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this); + break; + case JOB_COMPLETION: + instance = instances.get(event.getTarget()); + instance.onEvent(event); + switch (((JobCompletedEvent) event).getStatus()) { + case SUCCEEDED: + stateService.handleStateChange(instance, InstanceState.EVENT.SUCCEED, this); + break; + case FAILED: + stateService.handleStateChange(instance, InstanceState.EVENT.FAIL, this); + break; + case KILLED: + stateService.handleStateChange(instance, InstanceState.EVENT.KILL, this); + break; + case SUSPENDED: + stateService.handleStateChange(instance, InstanceState.EVENT.SUSPEND, this); + break; + default: + throw new InvalidStateTransitionException( + "Job seems to be have been managed outside Falcon."); + } + break; + default: + if (isTriggerEvent(event)) { + instance = buildInstance(event); + stateService.handleStateChange(instance, InstanceState.EVENT.TRIGGER, this); + // This happens where are no conditions the instance is waiting on (for example, no data inputs). + if (instance.isReady()) { + stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this); + } + } + } + } catch (Exception ee) { + throw new FalconException("Unable to cache execution instance", ee); + } + } + + // Evaluates the trigger predicate against the current event, to determine if a new instance needs to be triggered. + private boolean isTriggerEvent(Event event) { + try { + return triggerPredicate.evaluate(Predicate.getPredicate(event)); + } catch (FalconException e) { + return false; + } + } + + // Registers for all notifications that should trigger an instance. + // Currently, only time based triggers are handled. + protected void registerForNotifications() throws FalconException { + AlarmService.AlarmRequestBuilder requestBuilder = + (AlarmService.AlarmRequestBuilder) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME) + .createRequestBuilder(executionService, getId()); + Cluster processCluster = ProcessHelper.getCluster(process, cluster); + + InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster); + // If there are no instances, use process's start, else, use last materialized instance's nominal time + Date startTime = (instanceState == null) ? processCluster.getValidity().getStart() + : EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(), + EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1); + Date endTime = processCluster.getValidity().getEnd(); + // TODO : Handle cron based and calendar based time triggers + // TODO : Set execution order details. + requestBuilder.setFrequency(process.getFrequency()) + .setStartTime(new DateTime(startTime)) + .setEndTime(new DateTime(endTime)) + .setTimeZone(TimeZone.getTimeZone("UTC")); + NotificationServicesRegistry.register(requestBuilder.build()); + LOG.info("Registered for a time based notification for process {} with frequency: {}, " + + "start time: {}, end time: {}", process.getName(), process.getFrequency(), startTime, endTime); + triggerPredicate = Predicate.createTimePredicate(startTime.getTime(), endTime.getTime(), -1); + } + + @Override + public ID getId() { + return id; + } + + // This executor must handle any events intended for itself. + // Or, if it is job run or job complete notifications, so it can handle the instance's state transition. + private boolean shouldHandleEvent(Event event) { + return event.getTarget().equals(id) + || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION + || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_SCHEDULE; + } + + @Override + public void onTrigger(ExecutionInstance instance) throws FalconException { + instances.put(new ID(instance), (ProcessExecutionInstance) instance); + } + + @Override + public void onConditionsMet(ExecutionInstance instance) throws FalconException { + // Put process in run queue and register for notification + SchedulerService.JobScheduleRequestBuilder requestBuilder = (SchedulerService.JobScheduleRequestBuilder) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE) + .createRequestBuilder(executionService, getId()); + requestBuilder.setInstance(instance); + NotificationServicesRegistry.register(requestBuilder.build()); + } + + @Override + public void onSchedule(ExecutionInstance instance) throws FalconException { + JobCompletionService.JobCompletionRequestBuilder completionRequestBuilder = + (JobCompletionService.JobCompletionRequestBuilder) + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION) + .createRequestBuilder(executionService, getId()); + completionRequestBuilder.setExternalId(instance.getExternalID()); + completionRequestBuilder.setCluster(instance.getCluster()); + NotificationServicesRegistry.register(completionRequestBuilder.build()); + } + + @Override + public void onSuspend(ExecutionInstance instance) throws FalconException { + instances.invalidate(instance.getId()); + } + + @Override + public void onResume(ExecutionInstance instance) throws FalconException { + instances.put(instance.getId(), (ProcessExecutionInstance) instance); + } + + @Override + public void onKill(ExecutionInstance instance) throws FalconException { + instances.invalidate(instance.getId()); + } + + @Override + public void onSuccess(ExecutionInstance instance) throws FalconException { + instance.destroy(); + instances.invalidate(instance.getId()); + } + + @Override + public void onFailure(ExecutionInstance instance) throws FalconException { + instance.destroy(); + instances.invalidate(instance.getId()); + } + + @Override + public void onTimeOut(ExecutionInstance instance) throws FalconException { + instance.destroy(); + instances.invalidate(instance.getId()); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java new file mode 100644 index 0000000..3e7fc9b --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.execution; + +import org.apache.falcon.entity.v0.Frequency; +import org.joda.time.DateTime; + +/** + * Contains utility methods. + */ +public final class SchedulerUtil { + + private static final long MINUTE_IN_MS = 60 * 1000L; + private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; + + private SchedulerUtil(){}; + + /** + * Returns the frequency in millis from the given time. + * Needs to take the calender into account. + * @param referenceTime + * @param frequency + * @return + */ + public static long getFrequencyInMillis(DateTime referenceTime, Frequency frequency) { + switch (frequency.getTimeUnit()) { + case minutes: + return MINUTE_IN_MS * frequency.getFrequencyAsInt(); + case hours: + return HOUR_IN_MS * frequency.getFrequencyAsInt(); + case days: + return referenceTime.plusDays(frequency.getFrequencyAsInt()).getMillis() - referenceTime.getMillis(); + case months: + return referenceTime.plusMonths(frequency.getFrequencyAsInt()).getMillis() - referenceTime.getMillis(); + default: + throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name()); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java new file mode 100644 index 0000000..41d20a8 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service; + +import org.apache.falcon.exception.NotificationServiceException; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.request.NotificationRequest; +import org.apache.falcon.service.FalconService; +import org.apache.falcon.state.ID; + +/** + * An interface that every notification service must implement. + */ +public interface FalconNotificationService extends FalconService { + + /** + * Register for a notification. + * + * @param notifRequest + */ + void register(NotificationRequest notifRequest) throws NotificationServiceException; + + /** + * De-register from receiving notifications. + * @param handler - The notification handler that needs to be de-registered. + * @param callbackID + */ + void unregister(NotificationHandler handler, ID callbackID) throws NotificationServiceException; + + /** + * Creates and returns an implementation of + * {@link RequestBuilder} that is applicable to the service. + * @param handler - The notification handler that needs to be de-registered. + * @param callbackID + * @return + */ + RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID); + + /** + * Builder to build appropriate {@link NotificationRequest}. + * @param <T> + */ + abstract class RequestBuilder<T extends NotificationRequest> { + + protected NotificationHandler handler; + protected ID callbackId; + + public RequestBuilder(NotificationHandler notificationHandler, ID callbackID) { + if (notificationHandler == null) { + throw new IllegalArgumentException("Handler cannot be null."); + } + this.handler = notificationHandler; + this.callbackId = callbackID; + } + + /** + * @return Corresponding {@link NotificationRequest}. + */ + public abstract T build(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java b/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java new file mode 100644 index 0000000..3ffb489 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.notification.service; + +import org.apache.falcon.exception.NotificationServiceException; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.request.NotificationRequest; +import org.apache.falcon.service.Services; +import org.apache.falcon.state.ID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A service registry that manages the notification services. + * This class is also responsible for routing any register and unregister calls to the appropriate service. + */ +public final class NotificationServicesRegistry { + private static final Logger LOG = LoggerFactory.getLogger(NotificationServicesRegistry.class); + + /** + * A list of notifiation service that the scheduler framework uses. + */ + public enum SERVICE { + TIME("AlarmService"), + DATA("DataAvailabilityService"), + JOB_COMPLETION("JobCompletionService"), + JOB_SCHEDULE("JobSchedulerService"); + + private final String name; + + private SERVICE(String name) { + this.name = name; + } + + public String toString() { + return name; + } + } + + private NotificationServicesRegistry() { + } + + /** + * Routes the notification request to appropriate service based on the request. + * + * @param notifRequest + */ + public static void register(NotificationRequest notifRequest) throws NotificationServiceException { + FalconNotificationService service = getService(notifRequest.getService()); + service.register(notifRequest); + } + + /** + * De-registers the listener from all services. + * + * @param listenerID + */ + public static void unregister(NotificationHandler handler, ID listenerID) + throws NotificationServiceException { + for (SERVICE service : SERVICE.values()) { + unregisterForNotification(handler, listenerID, service); + } + } + + /** + * @param serviceType - Type of service requested + * @return An instance of {@link org.apache.falcon.notification.service.FalconNotificationService} + */ + public static FalconNotificationService getService(SERVICE serviceType) { + FalconNotificationService service = Services.get().getService(serviceType.toString()); + if (service == null) { + LOG.error("Unable to find service type : {} . Service not registered.", serviceType.toString()); + throw new RuntimeException("Unable to find service : " + serviceType.toString() + + " . Service not registered."); + } + return service; + } + + /** + * @param serviceName - Name of service requested + * @return - An instance of {@link org.apache.falcon.notification.service.FalconNotificationService} + * @throws NotificationServiceException + */ + public static FalconNotificationService getService(String serviceName) throws NotificationServiceException { + SERVICE serviceType = null; + for (SERVICE type : SERVICE.values()) { + if (type.toString().equals(serviceName)) { + serviceType = type; + } + } + if (serviceType == null) { + LOG.error("Unable to find service : {}. Not a valid service.", serviceName); + throw new NotificationServiceException("Unable to find service : " + serviceName + + " . Not a valid service."); + } + return getService(serviceType); + } + + /** + * Routes the unregister request to the mentioned service. + * @param handler + * @param listenerID + * @param service + */ + public static void unregisterForNotification(NotificationHandler handler, ID listenerID, SERVICE service) + throws NotificationServiceException { + FalconNotificationService falconNotificationService = getService(service); + falconNotificationService.unregister(handler, listenerID); + } +}
