FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6d313855 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6d313855 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6d313855 Branch: refs/heads/master Commit: 6d3138559e8395ac1140f10197fcd10badf64883 Parents: 6676032 Author: Pallavi Rao <[email protected]> Authored: Thu Nov 26 15:56:14 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Nov 26 15:56:14 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../src/main/resources/falcon/checkstyle.xml | 6 +- .../main/resources/falcon/findbugs-exclude.xml | 15 + common/src/main/resources/startup.properties | 29 ++ pom.xml | 4 + scheduler/pom.xml | 84 ++++ .../falcon/execution/ExecutionInstance.java | 41 +- .../execution/FalconExecutionService.java | 24 +- .../execution/ProcessExecutionInstance.java | 60 ++- .../org/apache/falcon/predicate/Predicate.java | 50 ++- .../org/apache/falcon/state/EntityState.java | 30 ++ .../org/apache/falcon/state/InstanceID.java | 19 + .../org/apache/falcon/state/InstanceState.java | 32 ++ .../org/apache/falcon/state/StateService.java | 1 + .../falcon/state/store/AbstractStateStore.java | 2 +- .../falcon/state/store/EntityStateStore.java | 14 +- .../falcon/state/store/InMemoryStateStore.java | 22 +- .../falcon/state/store/InstanceStateStore.java | 18 +- .../apache/falcon/state/store/StateStore.java | 6 +- .../falcon/state/store/jdbc/BeanMapperUtil.java | 271 ++++++++++++ .../falcon/state/store/jdbc/EntityBean.java | 104 +++++ .../falcon/state/store/jdbc/InstanceBean.java | 199 +++++++++ .../falcon/state/store/jdbc/JDBCStateStore.java | 416 ++++++++++++++++++ .../state/store/service/FalconJPAService.java | 171 ++++++++ .../falcon/tools/FalconStateStoreDBCLI.java | 435 +++++++++++++++++++ .../src/main/resources/META-INF/persistence.xml | 50 +++ .../main/resources/falcon-buildinfo.properties | 28 ++ .../execution/FalconExecutionServiceTest.java | 53 ++- .../service/SchedulerServiceTest.java | 13 +- .../falcon/state/AbstractSchedulerTestBase.java | 71 +++ .../falcon/state/EntityStateServiceTest.java | 39 +- .../falcon/state/InstanceStateServiceTest.java | 24 +- .../state/service/TestFalconJPAService.java | 64 +++ .../state/service/store/TestJDBCStateStore.java | 397 +++++++++++++++++ .../falcon/tools/TestFalconStateStoreDBCLI.java | 89 ++++ scheduler/src/test/resources/startup.properties | 154 +++++++ src/bin/falcon-db.sh | 49 +++ src/conf/startup.properties | 22 +- src/main/assemblies/distributed-package.xml | 5 + src/main/assemblies/standalone-package.xml | 5 + unit/src/main/resources/startup.properties | 18 + 41 files changed, 3071 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 13d3439..31a2566 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao) + FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava) FALCON-1573 Supply user-defined properties to Oozie workflows during schedule(Daniel Del Castillo via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/src/main/resources/falcon/checkstyle.xml b/checkstyle/src/main/resources/falcon/checkstyle.xml index 2130e73..292a0a3 100644 --- a/checkstyle/src/main/resources/falcon/checkstyle.xml +++ b/checkstyle/src/main/resources/falcon/checkstyle.xml @@ -230,9 +230,9 @@ <!-- allow warnings to be suppressed --> <module name="SuppressionCommentFilter"> - <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/> - <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/> - <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/> + <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/> + <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/> + <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/> </module> </module> http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml index 0a7580d..e1a5a2e 100644 --- a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml +++ b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml @@ -31,4 +31,19 @@ <Match> <Bug pattern="DM_DEFAULT_ENCODING" /> </Match> + + <Match> + <Class name="org.apache.falcon.tools.FalconStateStoreDBCLI" /> + <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" /> + </Match> + + <Match> + <Class name="org.apache.falcon.state.store.jdbc.EntityBean" /> + <Bug pattern="NP_BOOLEAN_RETURN_NULL" /> + </Match> + + <Match> + <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" /> + <Bug pattern="NP_BOOLEAN_RETURN_NULL" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index cc5212a..3f1ed03 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -43,6 +43,14 @@ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ org.apache.falcon.service.ProxyUserService +## If you wish to use Falcon native scheduler add the commented out services below to application.services ## +# org.apache.falcon.notification.service.impl.JobCompletionService,\ +# org.apache.falcon.notification.service.impl.SchedulerService,\ +# org.apache.falcon.notification.service.impl.AlarmService,\ +# org.apache.falcon.notification.service.impl.DataAvailabilityService,\ +# org.apache.falcon.execution.FalconExecutionService,\ +# org.apache.falcon.state.store.service.FalconJPAService + # List of Lifecycle policies configured. *.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete @@ -55,6 +63,8 @@ org.apache.falcon.entity.store.FeedLocationStore,\ org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.service.SharedLibraryHostingService +## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## +# org.apache.falcon.state.store.jdbc.JdbcStateStore ##### JMS MQ Broker Implementation class ##### *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory @@ -247,3 +257,22 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Setting monitoring plugin, if SMTP parameters is defined #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ # org.apache.falcon.plugin.EmailNotificationPlugin + +######### StateStore Properties ##### +#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true +#*.falcon.statestore.jdbc.username=sa +#*.falcon.statestore.jdbc.password= +#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +## Maximum number of active connections that can be allocated from this pool at the same time. +#*.falcon.statestore.pool.max.active.conn=10 +#*.falcon.statestore.connection.properties= +## Indicates the interval (in milliseconds) between eviction runs. +#*.falcon.statestore.validate.db.connection.eviction.interval=300000 +## The number of objects to examine during each run of the idle object evictor thread. +#*.falcon.statestore.validate.db.connection.eviction.num=10 +## Creates Falcon DB. +## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +#*.falcon.statestore.create.db.schema=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index fad8902..678c87c 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,10 @@ <quartz.version>2.2.1</quartz.version> <joda.version>2.8.2</joda.version> <mockito.version>1.9.5</mockito.version> + <openjpa.version>2.4.0</openjpa.version> + <javax-validation.version>1.0.0.GA</javax-validation.version> + <derby.version>10.10.1.1</derby.version> + <commons-dbcp.version>1.4</commons-dbcp.version> <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo> <excluded.test.groups>exhaustive</excluded.test.groups> </properties> http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/pom.xml ---------------------------------------------------------------------- diff --git a/scheduler/pom.xml b/scheduler/pom.xml index 20a91d2..336997d 100644 --- a/scheduler/pom.xml +++ b/scheduler/pom.xml @@ -88,6 +88,33 @@ </dependency> <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-persistence</artifactId> + <version>${openjpa.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-jdbc</artifactId> + <version>${openjpa.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-persistence-jdbc</artifactId> + <version>${openjpa.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + <version>${javax-validation.version}</version> + </dependency> + + <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> </dependency> @@ -98,11 +125,18 @@ <version>${mockito.version}</version> <scope>test</scope> </dependency> + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda.version}</version> </dependency> + + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.10.1.1</version> + </dependency> </dependencies> <build> @@ -115,6 +149,56 @@ <target>1.7</target> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <phase>process-classes</phase> + <configuration> + <tasks> + <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/> + <openjpac> + <classpath refid="maven.compile.classpath"/> + </openjpac> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + </artifactItem> + <artifactItem> + <groupId>commons-dbcp</groupId> + <artifactId>commons-dbcp</artifactId> + <version>${commons-dbcp.version}</version> + </artifactItem> + </artifactItems> + <outputDirectory>${project.build.directory}/dependency</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java index 2d6b67d..5f96d3f 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java @@ -26,7 +26,6 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import java.util.List; -import java.util.TimeZone; /** * Represents an execution instance of an entity. @@ -38,20 +37,31 @@ public abstract class ExecutionInstance implements NotificationHandler { // External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine. // For example, for Oozie this would be the workflow Id. private String externalID; + // Time at which instance has to be run. private final DateTime instanceTime; + // Time at which instance is created. private final DateTime creationTime; private DateTime actualStart; private DateTime actualEnd; - private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC")); + protected static final DateTimeZone UTC = DateTimeZone.UTC; /** - * @param instanceTime + * @param instanceTime Time at which instance has to be run. * @param cluster + * @param creationTime Time at which instance is created to run. */ - public ExecutionInstance(DateTime instanceTime, String cluster) { + public ExecutionInstance(DateTime instanceTime, String cluster, DateTime creationTime) { this.instanceTime = new DateTime(instanceTime, UTC); this.cluster = cluster; - this.creationTime = DateTime.now(UTC); + this.creationTime = new DateTime(creationTime, UTC); + } + + /** + * @param instanceTime + * @param cluster + */ + public ExecutionInstance(DateTime instanceTime, String cluster) { + this(instanceTime, cluster, DateTime.now()); } /** @@ -92,7 +102,7 @@ public abstract class ExecutionInstance implements NotificationHandler { public abstract Entity getEntity(); /** - * @return - The nominal time of the instance. + * @return - The instance time of the instance. */ public DateTime getInstanceTime() { return instanceTime; @@ -138,16 +148,31 @@ public abstract class ExecutionInstance implements NotificationHandler { this.actualEnd = actualEnd; } - + /** + * Creation time of an instance. + * @return + */ public DateTime getCreationTime() { return creationTime; } /** + * Set the gating conditions on which this instance is waiting before it is scheduled for execution. + * @param predicates + */ + public abstract void setAwaitingPredicates(List<Predicate> predicates); + + /** * @return - The gating conditions on which this instance is waiting before it is scheduled for execution. * @throws FalconException */ - public abstract List<Predicate> getAwaitingPredicates() throws FalconException; + public abstract List<Predicate> getAwaitingPredicates(); + + /** + * set the sequential numerical id of the instance. + */ + public abstract void setInstanceSequence(int sequence); + /** * Suspends the instance if it is in one of the active states, waiting, ready or running. http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java index b48a65b..b6741a4 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java @@ -22,6 +22,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.service.FalconService; import org.apache.falcon.state.EntityClusterID; @@ -58,17 +59,22 @@ public final class FalconExecutionService implements FalconService, EntityStateC public void init() { LOG.debug("State store instance being used : {}", AbstractStateStore.get()); // Initialize all executors from store - for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) { - try { - for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { - EntityExecutor executor = createEntityExecutor(entity, cluster); - executors.put(new EntityClusterID(entity, cluster), executor); - executor.schedule(); + try { + for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) { + try { + for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { + EntityExecutor executor = createEntityExecutor(entity, cluster); + executors.put(new EntityClusterID(entity, cluster), executor); + executor.schedule(); + } + } catch (FalconException e) { + LOG.error("Unable to load entity : " + entity.getName(), e); + throw new RuntimeException(e); } - } catch (FalconException e) { - LOG.error("Unable to load entity : " + entity.getName(), e); - throw new RuntimeException(e); } + } catch (StateStoreException e) { + LOG.error("Unable to get Entities from State Store ", e); + throw new RuntimeException(e); } // TODO : During migration, the state store itself may not have been completely bootstrapped. } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java index 434f168..cff4a73 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; + /** * Represents an execution instance of a process. * Responsible for user actions such as suspend, resume, kill on individual instances. @@ -57,7 +58,7 @@ import java.util.List; public class ProcessExecutionInstance extends ExecutionInstance { private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class); private final Process process; - private List<Predicate> awaitedPredicates = new ArrayList<Predicate>(); + private List<Predicate> awaitedPredicates = new ArrayList<>(); private DAGEngine dagEngine = null; private boolean hasTimedOut = false; private InstanceID id; @@ -72,8 +73,9 @@ public class ProcessExecutionInstance extends ExecutionInstance { * @param cluster * @throws FalconException */ - public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException { - super(instanceTime, cluster); + public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster, + DateTime creationTime) throws FalconException { + super(instanceTime, cluster, creationTime); this.process = process; this.id = new InstanceID(process, cluster, getInstanceTime()); computeInstanceSequence(); @@ -81,7 +83,18 @@ public class ProcessExecutionInstance extends ExecutionInstance { registerForNotifications(false); } - // Computes the instance number based on the nominal time. + /** + * + * @param process + * @param instanceTime + * @param cluster + * @throws FalconException + */ + public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException { + this(process, instanceTime, cluster, DateTime.now(UTC)); + } + + // Computes the instance number based on the instance Time. // Method can be extended to assign instance numbers for non-time based instances. private void computeInstanceSequence() { for (Cluster processCluster : process.getClusters().getClusters()) { @@ -225,11 +238,21 @@ public class ProcessExecutionInstance extends ExecutionInstance { } @Override - public List<Predicate> getAwaitingPredicates() throws FalconException { + public void setAwaitingPredicates(List<Predicate> predicates) { + this.awaitedPredicates = predicates; + } + + @Override + public List<Predicate> getAwaitingPredicates() { return awaitedPredicates; } @Override + public void setInstanceSequence(int sequence) { + this.instanceSequence = sequence; + } + + @Override public void suspend() throws FalconException { if (getExternalID() != null) { dagEngine.suspend(this); @@ -242,7 +265,7 @@ public class ProcessExecutionInstance extends ExecutionInstance { // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended if (getExternalID() != null) { dagEngine.resume(this); - } else if (awaitedPredicates.size() != 0) { + } else if (awaitedPredicates != null && !awaitedPredicates.isEmpty()) { // Evaluate any remaining predicates registerForNotifications(true); } @@ -271,6 +294,31 @@ public class ProcessExecutionInstance extends ExecutionInstance { } @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !o.getClass().equals(this.getClass())) { + return false; + } + + ProcessExecutionInstance processExecutionInstance = (ProcessExecutionInstance) o; + + return this.getId().equals(processExecutionInstance.getId()) + && Predicate.isEqualAwaitingPredicates(this.getAwaitingPredicates(), + processExecutionInstance.getAwaitingPredicates()) + && this.getInstanceSequence() == (processExecutionInstance.getInstanceSequence()); + } + + @Override + public int hashCode() { + int result = id != null ? id.hashCode() : 0; + result = 31 * result + (awaitedPredicates != null ? awaitedPredicates.hashCode() : 0); + result = 31 * result + instanceSequence; + return result; + } + + @Override public void destroy() throws FalconException { NotificationServicesRegistry.unregister(executionService, getId()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index fb4c8c9..164fb0e 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -27,14 +27,19 @@ import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.state.ID; import java.io.Serializable; -import java.util.HashMap; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.TreeMap; /** * Represents the gating condition for which an instance is waiting before it is scheduled. * This will be serialized and stored in state store. */ public class Predicate implements Serializable { + /** * Type of predicate, currently data and time are supported. */ @@ -47,7 +52,10 @@ public class Predicate implements Serializable { private final TYPE type; // A key-value pair of clauses that need make this predicate. - private Map<String, Comparable> clauses = new HashMap<String, Comparable>(); + private Map<String, Comparable> clauses = new TreeMap<>(); + + // Id for a predicate used for comparison. + private String id; // A generic "any" object that can be used when a particular key is allowed to have any value. public static final Comparable<? extends Serializable> ANY = new Any(); @@ -59,6 +67,10 @@ public class Predicate implements Serializable { return type; } + public String getId() { + return id; + } + /** * @param key * @return the value corresponding to the key @@ -106,6 +118,7 @@ public class Predicate implements Serializable { */ public Predicate(TYPE type) { this.type = type; + this.id = this.type + String.valueOf(System.currentTimeMillis()); } /** @@ -120,7 +133,7 @@ public class Predicate implements Serializable { * @param rhs - The value in the key-value pair of a clause * @return This instance */ - public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) { + Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) { clauses.put(lhs, rhs); return this; } @@ -217,4 +230,35 @@ public class Predicate implements Serializable { return super.hashCode(); } } + + public static boolean isEqualAwaitingPredicates(List<Predicate> thisAwaitingPredicates, + List<Predicate> otherAwaitingPredicates) { + if (thisAwaitingPredicates == null && otherAwaitingPredicates == null) { + return true; + } else if (thisAwaitingPredicates != null && otherAwaitingPredicates != null) { + if (thisAwaitingPredicates.size() != otherAwaitingPredicates.size()) { + return false; + } + Collections.sort(thisAwaitingPredicates, new PredicateComparator()); + Collections.sort(otherAwaitingPredicates, new PredicateComparator()); + + Iterator<Predicate> thisIterator = thisAwaitingPredicates.iterator(); + Iterator<Predicate> otherIterator = otherAwaitingPredicates.iterator(); + + while (thisIterator.hasNext()) { + if (!thisIterator.next().evaluate(otherIterator.next())) { + return false; + } + } + return true; + } + return false; + } + + static class PredicateComparator implements Serializable, Comparator<Predicate> { + @Override + public int compare(Predicate o1, Predicate o2) { + return o1.getId().compareTo(o2.getId()); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/EntityState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java index 15aea9a..f44f174 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java @@ -130,4 +130,34 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState. public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { return currentState.nextTransition(event); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + EntityState other = (EntityState) o; + + if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState()) + : other.getCurrentState() != null) { + return false; + } + + if (this.getEntity() != null ? !this.getEntity().equals(other.getEntity()) + : other.getEntity() != null) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = currentState != null ? currentState.hashCode() : 0; + result = 31 * result + (entity != null ? entity.hashCode() : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java index a722be9..72cfc33 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java @@ -80,4 +80,23 @@ public class InstanceID extends ID { public EntityClusterID getEntityClusterID() { return new EntityClusterID(entityType, entityName, clusterName); } + + public static EntityType getEntityType(String id) { + if (id == null) { + return null; + } + String[] values = id.split(KEY_SEPARATOR); + String entityType = values[0]; + return EntityType.valueOf(entityType); + } + + public static String getEntityName(String id) { + if (id == null) { + return null; + } + String[] values = id.split(KEY_SEPARATOR); + String entityName = values[1]; + return entityName; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java index ada9d2b..7f2bda9 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -252,4 +252,36 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance public String toString() { return instance.getId().toString() + "STATE: " + currentState.toString(); } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + InstanceState other = (InstanceState) o; + + if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState()) + : other.getCurrentState() != null) { + return false; + } + + if (this.getInstance() != null ? !this.getInstance().equals(other.getInstance()) + : other.getInstance() != null) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = currentState != null ? currentState.hashCode() : 0; + result = 31 * result + (instance != null ? instance.hashCode() : 0); + return result; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/StateService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java index c1671ac..c702cc3 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java +++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java @@ -136,6 +136,7 @@ public final class StateService { InstanceState instanceState = stateStore.getExecutionInstance(id); InstanceState.STATE newState = instanceState.nextTransition(event); callbackHandler(instance, event, handler); + instanceState = new InstanceState(instance); instanceState.setCurrentState(newState); stateStore.updateExecutionInstance(instanceState); LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id, http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java index e36f85c..2d576e5 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java @@ -79,7 +79,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha */ public static synchronized StateStore get() { if (stateStore == null) { - String storeImpl = StartupProperties.get().getProperty("state.store.impl", + String storeImpl = StartupProperties.get().getProperty("falcon.state.store.impl", "org.apache.falcon.state.store.InMemoryStateStore"); try { stateStore = ReflectionUtils.getInstanceByClassName(storeImpl); http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java index 113f4c5..75a315f 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java @@ -45,18 +45,18 @@ public interface EntityStateStore { * @param entityId * @return true, if entity exists in store. */ - boolean entityExists(EntityID entityId); + boolean entityExists(EntityID entityId) throws StateStoreException;; /** * @param state * @return Entities in a given state. */ - Collection<Entity> getEntities(EntityState.STATE state); + Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException; /** * @return All Entities in the store. */ - Collection<EntityState> getAllEntities(); + Collection<EntityState> getAllEntities() throws StateStoreException; /** * Update an existing entity with the new values. @@ -73,4 +73,12 @@ public interface EntityStateStore { * @throws StateStoreException */ void deleteEntity(EntityID entityId) throws StateStoreException; + + + /** + * Removes all entities and its instances from the store. + * + * @throws StateStoreException + */ + void deleteEntities() throws StateStoreException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java index 52b3bb8..7ab996a 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java @@ -50,8 +50,7 @@ public final class InMemoryStateStore extends AbstractStateStore { private static final StateStore STORE = new InMemoryStateStore(); - private InMemoryStateStore() { - } + private InMemoryStateStore() {} public static StateStore get() { return STORE; @@ -114,6 +113,11 @@ public final class InMemoryStateStore extends AbstractStateStore { } @Override + public void deleteEntities() throws StateStoreException { + entityStates.clear(); + } + + @Override public void putExecutionInstance(InstanceState instanceState) throws StateStoreException { String key = new InstanceID(instanceState.getInstance()).getKey(); if (instanceStates.containsKey(key)) { @@ -223,6 +227,20 @@ public final class InMemoryStateStore extends AbstractStateStore { } } + @Override + public void deleteExecutionInstances() { + instanceStates.clear(); + } + + @Override + public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException { + if (!instanceStates.containsKey(instanceID.toString())) { + throw new StateStoreException("Instance with key, " + instanceID.toString() + " does not exist."); + } + instanceStates.remove(instanceID.toString()); + } + + @Override public void clear() { entityStates.clear(); instanceStates.clear(); http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java index 483d9e6..f1d1931 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java @@ -104,12 +104,26 @@ public interface InstanceStateStore { * @param instanceId * @return true, if instance exists. */ - boolean executionInstanceExists(InstanceID instanceId); + boolean executionInstanceExists(InstanceID instanceId) throws StateStoreException; /** * Delete instances of a given entity. * * @param entityId */ - void deleteExecutionInstances(EntityID entityId); + void deleteExecutionInstances(EntityID entityId) throws StateStoreException; + + + /** + * Delete an instance based on ID. + * + * @param instanceID + * @throws StateStoreException + */ + void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException; + + /** + * Delete all instances. + */ + void deleteExecutionInstances(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java index f595c26..592e1fb 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java @@ -17,11 +17,15 @@ */ package org.apache.falcon.state.store; +import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.service.ConfigurationChangeListener; /** * Interface that combines entity, instance store APIs and also config change listener's. */ public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore { - + /** + * Deletes all entities and instances. + */ + void clear() throws StateStoreException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java new file mode 100644 index 0000000..4bee269 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store.jdbc; + +import org.apache.commons.io.IOUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.ProcessExecutionInstance; +import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.InstanceID; +import org.apache.falcon.state.InstanceState; +import org.joda.time.DateTime; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Mapping util for Persistent Store. + */ +public final class BeanMapperUtil { + private BeanMapperUtil() { + } + + /** + * Converts Entity object to EntityBean which will be stored in DB. + * @param entityState + * @return + */ + public static EntityBean convertToEntityBean(EntityState entityState) { + EntityBean entityBean = new EntityBean(); + Entity entity = entityState.getEntity(); + String id = new EntityID(entity).getKey(); + entityBean.setId(id); + entityBean.setName(entity.getName()); + entityBean.setState(entityState.getCurrentState().toString()); + entityBean.setType(entity.getEntityType().toString()); + return entityBean; + } + + /** + * Converts EntityBean of Data Base to EntityState. + * @param entityBean + * @return + * @throws StateStoreException + */ + public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException { + try { + Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName()); + EntityState entityState = new EntityState(entity); + entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState())); + return entityState; + } catch (FalconException e) { + throw new StateStoreException(e); + } + } + + /** + * Converts list of EntityBeans of Data Base to EntityStates. + * @param entityBeans + * @return + * @throws StateStoreException + */ + public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans) + throws StateStoreException { + List<EntityState> entityStates = new ArrayList<>(); + if (entityBeans != null && !entityBeans.isEmpty()) { + for (EntityBean entityBean : entityBeans) { + entityStates.add(convertToEntityState(entityBean)); + } + } + return entityStates; + } + + /** + * Converts list of EntityBeans of Data Base to Entities. + * @param entityBeans + * @return + * @throws StateStoreException + */ + public static Collection<Entity> convertToEntities(Collection<EntityBean> entityBeans) throws StateStoreException { + List<Entity> entities = new ArrayList<>(); + try { + if (entityBeans != null && !entityBeans.isEmpty()) { + for (EntityBean entityBean : entityBeans) { + Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName()); + entities.add(entity); + } + } + return entities; + } catch (FalconException e) { + throw new StateStoreException(e); + } + } + + /** + * Convert instance of Entity's instance to InstanceBean of DB. + * @param instanceState + * @return + * @throws StateStoreException + * @throws IOException + */ + public static InstanceBean convertToInstanceBean(InstanceState instanceState) throws StateStoreException, + IOException { + InstanceBean instanceBean = new InstanceBean(); + ExecutionInstance instance = instanceState.getInstance(); + if (instance.getActualEnd() != null) { + instanceBean.setActualEndTime(new Timestamp(instance.getActualEnd().getMillis())); + } + if (instance.getActualStart() != null) { + instanceBean.setActualStartTime(new Timestamp(instance.getActualStart().getMillis())); + } + if (instanceState.getCurrentState() != null) { + instanceBean.setCurrentState(instanceState.getCurrentState().toString()); + } + if (instance.getExternalID() != null) { + instanceBean.setExternalID(instanceState.getInstance().getExternalID()); + } + + instanceBean.setCluster(instance.getCluster()); + instanceBean.setCreationTime(new Timestamp(instance.getCreationTime().getMillis())); + instanceBean.setId(instance.getId().toString()); + instanceBean.setInstanceTime(new Timestamp(instance.getInstanceTime().getMillis())); + instanceBean.setEntityId(new InstanceID(instance).getEntityID().toString()); + + instanceBean.setInstanceSequence(instance.getInstanceSequence()); + if (instance.getAwaitingPredicates() != null && !instance.getAwaitingPredicates().isEmpty()) { + ObjectOutputStream out = null; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try { + out = new ObjectOutputStream(byteArrayOutputStream); + out.writeInt(instance.getAwaitingPredicates().size()); + for (Predicate predicate : instance.getAwaitingPredicates()) { + out.writeObject(predicate); + } + instanceBean.setAwaitedPredicates(byteArrayOutputStream.toByteArray()); + } finally { + IOUtils.closeQuietly(out); + } + } + return instanceBean; + } + + /** + * Converts instance entry of DB to instance of ExecutionInstance. + * @param instanceBean + * @return + * @throws StateStoreException + * @throws IOException + */ + public static InstanceState convertToInstanceState(InstanceBean instanceBean) throws StateStoreException, + IOException { + EntityType entityType = InstanceID.getEntityType(instanceBean.getId()); + ExecutionInstance executionInstance = getExecutionInstance(entityType, instanceBean); + if (instanceBean.getActualEndTime() != null) { + executionInstance.setActualEnd(new DateTime(instanceBean.getActualEndTime().getTime())); + } + if (instanceBean.getActualStartTime() != null) { + executionInstance.setActualStart(new DateTime(instanceBean.getActualStartTime().getTime())); + } + executionInstance.setExternalID(instanceBean.getExternalID()); + executionInstance.setInstanceSequence(instanceBean.getInstanceSequence()); + + byte[] result = instanceBean.getAwaitedPredicates(); + List<Predicate> predicates = new ArrayList<>(); + if (result != null && result.length != 0) { + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result); + ObjectInputStream in = null; + try { + in = new ObjectInputStream(byteArrayInputStream); + int length = in.readInt(); + for (int i = 0; i < length; i++) { + Predicate predicate = (Predicate) in.readObject(); + predicates.add(predicate); + } + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + } + executionInstance.setAwaitingPredicates(predicates); + InstanceState instanceState = new InstanceState(executionInstance); + instanceState.setCurrentState(InstanceState.STATE.valueOf(instanceBean.getCurrentState())); + return instanceState; + } + + /** + * Converting list of instance entries of DB to instance of ExecutionInstance. + * @param instanceBeanList + * @return + * @throws StateStoreException + * @throws IOException + */ + public static Collection<InstanceState> convertToInstanceState(List<InstanceBean> instanceBeanList) + throws StateStoreException, IOException { + List<InstanceState> instanceStates = new ArrayList<>(); + for (InstanceBean instanceBean : instanceBeanList) { + instanceStates.add(convertToInstanceState(instanceBean)); + } + return instanceStates; + } + + private static ExecutionInstance getExecutionInstance(EntityType entityType, + InstanceBean instanceBean) throws StateStoreException { + try { + Entity entity = EntityUtil.getEntity(entityType, InstanceID.getEntityName(instanceBean.getId())); + return getExecutionInstance(entityType, entity, instanceBean.getInstanceTime().getTime(), + instanceBean.getCluster(), instanceBean.getCreationTime().getTime()); + } catch (FalconException e) { + throw new StateStoreException(e); + } + } + + public static ExecutionInstance getExecutionInstance(EntityType entityType, Entity entity, long instanceTime, + String cluster, long creationTime) throws StateStoreException { + if (entityType == EntityType.PROCESS) { + try { + return new ProcessExecutionInstance((org.apache.falcon.entity.v0.process.Process) entity, + new DateTime(instanceTime), cluster, new DateTime(creationTime)); + } catch (FalconException e) { + throw new StateStoreException("Entity not found"); + } + } else { + throw new UnsupportedOperationException("Not supported for entity type " + entityType.toString()); + } + } + + + public static byte[] getAwaitedPredicates(InstanceState instanceState) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(byteArrayOutputStream); + out.writeInt(instanceState.getInstance().getAwaitingPredicates().size()); + for (Predicate predicate : instanceState.getInstance().getAwaitingPredicates()) { + out.writeObject(predicate); + } + return byteArrayOutputStream.toByteArray(); + } finally { + IOUtils.closeQuietly(out); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java new file mode 100644 index 0000000..03ada39 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store.jdbc; + +import org.apache.openjpa.persistence.jdbc.Index; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Entity object which will be stored in Data Base. + */ +@Entity +@NamedQueries({ + @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"), + @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"), + @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"), + @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"), + @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"), + @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"), + @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")}) +//RESUME CHECKSTYLE CHECK LineLengthCheck +@Table(name = "ENTITIES") +public class EntityBean { + @NotNull + @Id + private String id; + + @Basic + @NotNull + @Column(name = "name") + private String name; + + + @Basic + @Index + @NotNull + @Column(name = "type") + private String type; + + @Basic + @Index + @NotNull + @Column(name = "current_state") + private String state; + + public EntityBean() { + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java new file mode 100644 index 0000000..0e3dfa9 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store.jdbc; + +import org.apache.openjpa.persistence.jdbc.Index; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Lob; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +import java.sql.Timestamp; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Instance State which will be stored in DB. + */ +@Entity +@NamedQueries({ + @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"), + @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"), + @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"), + @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates where a.id = :id"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), + @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"), + @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a") +}) +//RESUME CHECKSTYLE CHECK LineLengthCheck +@Table(name = "INSTANCES") +public class InstanceBean { + + @Id + @NotNull + private String id; + + @Basic + @Index + @NotNull + @Column(name = "entity_id") + private String entityId; + + @Basic + @Index + @NotNull + @Column(name = "cluster") + private String cluster; + + @Basic + @Index + @Column(name = "external_id") + private String externalID; + + @Basic + @Index + @Column(name = "instance_time") + private Timestamp instanceTime; + + @Basic + @Index + @NotNull + @Column(name = "creation_time") + private Timestamp creationTime; + + @Basic + @Column(name = "actual_start_time") + private Timestamp actualStartTime; + + @Basic + @Column(name = "actual_end_time") + private Timestamp actualEndTime; + + @Basic + @Index + @NotNull + @Column(name = "current_state") + private String currentState; + + @Basic + @Index + @NotNull + @Column(name = "instance_sequence") + private Integer instanceSequence; + + + @Column(name = "awaited_predicates", columnDefinition = "BLOB") + @Lob + private byte[] awaitedPredicates; + + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getExternalID() { + return externalID; + } + + public void setExternalID(String externalID) { + this.externalID = externalID; + } + + public Timestamp getInstanceTime() { + return instanceTime; + } + + public void setInstanceTime(Timestamp instanceTime) { + this.instanceTime = instanceTime; + } + + public Timestamp getCreationTime() { + return creationTime; + } + + public void setCreationTime(Timestamp creationTime) { + this.creationTime = creationTime; + } + + public Timestamp getActualStartTime() { + return actualStartTime; + } + + public void setActualStartTime(Timestamp actualStartTime) { + this.actualStartTime = actualStartTime; + } + + public Timestamp getActualEndTime() { + return actualEndTime; + } + + public void setActualEndTime(Timestamp actualEndTime) { + this.actualEndTime = actualEndTime; + } + + public String getCurrentState() { + return currentState; + } + + public void setCurrentState(String currentState) { + this.currentState = currentState; + } + + public byte[] getAwaitedPredicates() { + return awaitedPredicates; + } + + public void setAwaitedPredicates(byte[] awaitedPredicates) { + this.awaitedPredicates = awaitedPredicates; + } + + public Integer getInstanceSequence() { + return instanceSequence; + } + + public void setInstanceSequence(Integer instanceSequence) { + this.instanceSequence = instanceSequence; + } + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java new file mode 100644 index 0000000..ca65b94 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store.jdbc; + +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.store.AbstractStateStore; +import org.apache.falcon.state.store.StateStore; +import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.util.StartupProperties; +import org.joda.time.DateTime; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.io.IOException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Persistent Data Store for Entities and Instances. + */ +public final class JDBCStateStore extends AbstractStateStore { + + private static final StateStore STORE = new JDBCStateStore(); + private static final String DEBUG = "debug"; + + private JDBCStateStore() {} + + public static StateStore get() { + return STORE; + } + + @Override + public void clear() throws StateStoreException { + if (!isModeDebug()) { + throw new UnsupportedOperationException("Clear Method not supported"); + } + deleteExecutionInstances(); + deleteEntities(); + } + + @Override + public void putEntity(EntityState entityState) throws StateStoreException { + EntityID entityID = new EntityID(entityState.getEntity()); + String key = entityID.getKey(); + if (entityExists(entityID)) { + throw new StateStoreException("Entity with key, " + key + " already exists."); + } + EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState); + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + entityManager.persist(entityBean); + commitAndCloseTransaction(entityManager); + } + + + @Override + public EntityState getEntity(EntityID entityID) throws StateStoreException { + EntityState entityState = getEntityByKey(entityID); + if (entityState == null) { + throw new StateStoreException("Entity with key, " + entityID + " does not exist."); + } + return entityState; + } + + private EntityState getEntityByKey(EntityID id) throws StateStoreException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_ENTITY"); + q.setParameter("id", id.getKey()); + List result = q.getResultList(); + if (result.isEmpty()) { + return null; + } + entityManager.close(); + return BeanMapperUtil.convertToEntityState((EntityBean) result.get(0)); + } + + @Override + public boolean entityExists(EntityID entityID) throws StateStoreException { + return getEntityByKey(entityID) == null ? false : true; + } + + @Override + public Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_ENTITY_FOR_STATE"); + q.setParameter("state", state.toString()); + List result = q.getResultList(); + entityManager.close(); + return BeanMapperUtil.convertToEntities(result); + } + + @Override + public Collection<EntityState> getAllEntities() throws StateStoreException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_ENTITIES"); + List result = q.getResultList(); + entityManager.close(); + return BeanMapperUtil.convertToEntityState(result); + } + + @Override + public void updateEntity(EntityState entityState) throws StateStoreException { + EntityID entityID = new EntityID(entityState.getEntity()); + if (!entityExists(entityID)) { + throw new StateStoreException("Entity with key, " + entityID + " doesn't exists."); + } + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("UPDATE_ENTITY"); + q.setParameter("id", entityID.getKey()); + if (entityState.getCurrentState() != null) { + q.setParameter("state", entityState.getCurrentState().toString()); + } + q.setParameter("type", entityState.getEntity().getEntityType().toString()); + q.setParameter("name", entityState.getEntity().getName()); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + @Override + public void deleteEntity(EntityID entityID) throws StateStoreException { + if (!entityExists(entityID)) { + throw new StateStoreException("Entity with key, " + entityID.getKey() + " does not exist."); + } + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_ENTITY"); + q.setParameter("id", entityID.getKey()); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + @Override + public void deleteEntities() throws StateStoreException { + if (!isModeDebug()) { + throw new UnsupportedOperationException("Delete Entities Table not supported"); + } + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_ENTITIES"); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + @Override + public void putExecutionInstance(InstanceState instanceState) throws StateStoreException { + InstanceID instanceID = new InstanceID(instanceState.getInstance()); + if (executionInstanceExists(instanceID)) { + throw new StateStoreException("Instance with key, " + instanceID + " already exists."); + } + try { + InstanceBean instanceBean = BeanMapperUtil.convertToInstanceBean(instanceState); + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + entityManager.persist(instanceBean); + commitAndCloseTransaction(entityManager); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + + @Override + public InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException { + InstanceState instanceState = getExecutionInstanceByKey(instanceId); + if (instanceState == null) { + throw new StateStoreException("Instance with key, " + instanceId.toString() + " does not exist."); + } + return instanceState; + } + + private InstanceState getExecutionInstanceByKey(ID instanceKey) throws StateStoreException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCE"); + q.setParameter("id", instanceKey.toString()); + List result = q.getResultList(); + entityManager.close(); + if (result.isEmpty()) { + return null; + } + try { + InstanceBean instanceBean = (InstanceBean)(result.get(0)); + return BeanMapperUtil.convertToInstanceState(instanceBean); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + + @Override + public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException { + InstanceID id = new InstanceID(instanceState.getInstance()); + String key = id.toString(); + if (!executionInstanceExists(id)) { + throw new StateStoreException("Instance with key, " + key + " does not exist."); + } + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("UPDATE_INSTANCE"); + ExecutionInstance instance = instanceState.getInstance(); + q.setParameter("id", key); + q.setParameter("cluster", instance.getCluster()); + q.setParameter("externalID", instance.getExternalID()); + q.setParameter("instanceTime", new Timestamp(instance.getInstanceTime().getMillis())); + q.setParameter("creationTime", new Timestamp(instance.getCreationTime().getMillis())); + if (instance.getActualEnd() != null) { + q.setParameter("actualEndTime", new Timestamp(instance.getActualEnd().getMillis())); + } + q.setParameter("currentState", instanceState.getCurrentState().toString()); + if (instance.getActualStart() != null) { + q.setParameter("actualStartTime", new Timestamp(instance.getActualStart().getMillis())); + } + q.setParameter("instanceSequence", instance.getInstanceSequence()); + if (instanceState.getInstance().getAwaitingPredicates() != null + && !instanceState.getInstance().getAwaitingPredicates().isEmpty()) { + try { + q.setParameter("awaitedPredicates", BeanMapperUtil.getAwaitedPredicates(instanceState)); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + @Override + public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster) + throws StateStoreException { + EntityClusterID id = new EntityClusterID(entity, cluster); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER"); + q.setParameter("entityId", id.getEntityID().getKey()); + q.setParameter("cluster", cluster); + List result = q.getResultList(); + entityManager.close(); + try { + return BeanMapperUtil.convertToInstanceState(result); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + + @Override + public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, + Collection<InstanceState.STATE> states) + throws StateStoreException { + EntityClusterID entityClusterID = new EntityClusterID(entity, cluster); + String entityKey = entityClusterID.getEntityID().getKey(); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES"); + q.setParameter("entityId", entityKey); + q.setParameter("cluster", cluster); + List<String> instanceStates = new ArrayList<>(); + for (InstanceState.STATE state : states) { + instanceStates.add(state.toString()); + } + q.setParameter("currentState", instanceStates); + List result = q.getResultList(); + entityManager.close(); + try { + return BeanMapperUtil.convertToInstanceState(result); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + + @Override + public Collection<InstanceState> getExecutionInstances(EntityClusterID id, + Collection<InstanceState.STATE> states) + throws StateStoreException { + String entityKey = id.getEntityID().getKey(); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES"); + q.setParameter("entityId", entityKey); + List<String> instanceStates = new ArrayList<>(); + for (InstanceState.STATE state : states) { + instanceStates.add(state.toString()); + } + q.setParameter("currentState", instanceStates); + List result = q.getResultList(); + entityManager.close(); + try { + return BeanMapperUtil.convertToInstanceState(result); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + + @Override + public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster, + Collection<InstanceState.STATE> states, DateTime start, + DateTime end) throws StateStoreException { + String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey(); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE"); + q.setParameter("entityId", entityKey); + List<String> instanceStates = new ArrayList<>(); + for (InstanceState.STATE state : states) { + instanceStates.add(state.toString()); + } + q.setParameter("currentState", instanceStates); + q.setParameter("startTime", new Timestamp(start.getMillis())); + q.setParameter("endTime", new Timestamp(end.getMillis())); + List result = q.getResultList(); + entityManager.close(); + try { + return BeanMapperUtil.convertToInstanceState(result); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + + @Override + public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException { + String key = new EntityClusterID(entity, cluster).getEntityID().getKey(); + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery("GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER"); + q.setParameter("entityId", key); + q.setParameter("cluster", cluster); + q.setMaxResults(1); + List result = q.getResultList(); + entityManager.close(); + if (!result.isEmpty()) { + try { + return BeanMapperUtil.convertToInstanceState((InstanceBean) result.get(0)); + } catch (IOException e) { + throw new StateStoreException(e); + } + } + return null; + } + + @Override + public boolean executionInstanceExists(InstanceID instanceKey) throws StateStoreException { + return getExecutionInstanceByKey(instanceKey) == null ? false : true; + } + + @Override + public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException { + String instanceKey = instanceID.toString(); + if (!executionInstanceExists(instanceID)) { + throw new StateStoreException("Instance with key, " + instanceKey + " does not exist."); + } + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_INSTANCE"); + q.setParameter("id", instanceKey); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + @Override + public void deleteExecutionInstances(EntityID entityID) { + String entityKey = entityID.getKey(); + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_INSTANCE_FOR_ENTITY"); + q.setParameter("entityId", entityKey); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + @Override + public void deleteExecutionInstances() { + if (!isModeDebug()) { + throw new UnsupportedOperationException("Delete Instances Table not supported"); + } + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery("DELETE_INSTANCES_TABLE"); + q.executeUpdate(); + commitAndCloseTransaction(entityManager); + } + + // Debug enabled for test cases + private boolean isModeDebug() { + return DEBUG.equals(StartupProperties.get().getProperty("domain")) ? true : false; + } + + private void commitAndCloseTransaction(EntityManager entityManager) { + entityManager.getTransaction().commit(); + entityManager.close(); + } + + private void beginTransaction(EntityManager entityManager) { + entityManager.getTransaction().begin(); + } + + private EntityManager getEntityManager() { + return FalconJPAService.get().getEntityManager(); + } + +}
