FALCON-1865 Persist Feed sla data to database Author: Praveen Adlakha <[email protected]>
Reviewers: Ajay Yadava <[email protected]> Closes #77 from PraveenAdlakha/feed_alert Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/de2f5c0a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/de2f5c0a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/de2f5c0a Branch: refs/heads/master Commit: de2f5c0ab1c26b8d198d067e066c579a86bce737 Parents: 10f3843 Author: Praveen Adlakha <[email protected]> Authored: Mon Mar 28 19:08:56 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Mar 28 19:08:56 2016 +0530 ---------------------------------------------------------------------- common/pom.xml | 41 ++ .../apache/falcon/persistence/EntityBean.java | 117 +++++ .../apache/falcon/persistence/InstanceBean.java | 229 ++++++++++ .../falcon/persistence/MonitoredFeedsBean.java | 73 ++++ .../falcon/persistence/PendingInstanceBean.java | 98 +++++ .../persistence/PersistenceConstants.java | 35 ++ .../persistence/ResultNotFoundException.java | 31 ++ .../apache/falcon/service/FalconJPAService.java | 170 +++++++ .../falcon/tools/FalconStateStoreDBCLI.java | 438 +++++++++++++++++++ .../src/main/resources/META-INF/persistence.xml | 113 +++++ common/src/main/resources/startup.properties | 7 +- .../src/main/resources/statestore.credentials | 4 +- common/src/main/resources/statestore.properties | 20 +- docs/src/site/twiki/FalconNativeScheduler.twiki | 2 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 175 ++++++++ .../service/FeedSLAMonitoringService.java | 191 +++----- .../jdbc/MonitoringJdbcStateStoreTest.java | 97 ++++ .../falcon/service/FeedSLAMonitoringTest.java | 34 -- scheduler/pom.xml | 42 +- .../falcon/state/store/jdbc/BeanMapperUtil.java | 2 + .../falcon/state/store/jdbc/EntityBean.java | 117 ----- .../falcon/state/store/jdbc/InstanceBean.java | 229 ---------- .../falcon/state/store/jdbc/JDBCStateStore.java | 4 +- .../state/store/service/FalconJPAService.java | 171 -------- .../falcon/tools/FalconStateStoreDBCLI.java | 436 ------------------ .../src/main/resources/META-INF/persistence.xml | 104 ----- .../execution/FalconExecutionServiceTest.java | 2 +- .../falcon/state/AbstractSchedulerTestBase.java | 2 +- .../state/service/TestFalconJPAService.java | 2 +- .../state/service/store/TestJDBCStateStore.java | 2 +- scheduler/src/test/resources/startup.properties | 5 +- .../src/test/resources/statestore.properties | 2 +- src/build/findbugs-exclude.xml | 25 +- src/conf/startup.properties | 2 +- unit/pom.xml | 10 + unit/src/main/resources/startup.properties | 1 + .../org/apache/falcon/unit/TestFalconUnit.java | 1 + .../AbstractSchedulerManagerJerseyIT.java | 2 +- webapp/src/test/resources/startup.properties | 2 +- 39 files changed, 1744 insertions(+), 1294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index df28f9b..c54f9d8 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -187,6 +187,26 @@ <groupId>com.thinkaurelius.titan</groupId> <artifactId>titan-berkeleyje</artifactId> </dependency> + + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-jdbc</artifactId> + <version>${openjpa.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-persistence-jdbc</artifactId> + <version>${openjpa.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + <version>${javax-validation.version}</version> + </dependency> </dependencies> <build> @@ -216,6 +236,27 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <phase>process-classes</phase> + <configuration> + <tasks> + <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/> + <openjpac> + <classpath refid="maven.compile.classpath"/> + </openjpac> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java new file mode 100644 index 0000000..5c94fa4 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import org.apache.openjpa.persistence.jdbc.Index; + +import javax.persistence.Basic; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.OneToMany; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +import java.util.List; +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Entity object which will be stored in Data Base. + */ +@Entity +@NamedQueries({ + @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"), + @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"), + @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"), + @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"), + @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"), + @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"), + @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")}) +//RESUME CHECKSTYLE CHECK LineLengthCheck +@Table(name = "ENTITIES") +public class EntityBean { + @NotNull + @Id + private String id; + + @Basic + @NotNull + @Column(name = "name") + private String name; + + + @Basic + @Index + @NotNull + @Column(name = "type") + private String type; + + @Basic + @Index + @NotNull + @Column(name = "current_state") + private String state; + + @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean") + private List<InstanceBean> instanceBeans; + + public EntityBean() { + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public List<InstanceBean> getInstanceBeans() { + return instanceBeans; + } + + public void setInstanceBeans(List<InstanceBean> instanceBeans) { + this.instanceBeans = instanceBeans; + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java new file mode 100644 index 0000000..b7e10f1 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import org.apache.openjpa.persistence.jdbc.ForeignKey; +import org.apache.openjpa.persistence.jdbc.ForeignKeyAction; +import org.apache.openjpa.persistence.jdbc.Index; + +import javax.persistence.Basic; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Lob; +import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +import java.sql.Timestamp; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Instance State which will be stored in DB. + */ +@Entity +@NamedQueries({ + @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"), + @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"), + @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"), + @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"), + @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), + @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), + @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"), + @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"), + @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState") +}) +//RESUME CHECKSTYLE CHECK LineLengthCheck +@Table(name = "INSTANCES") +public class InstanceBean { + + @Id + @NotNull + private String id; + + @Basic + @Index + @NotNull + @Column(name = "entity_id") + private String entityId; + + @Basic + @Index + @NotNull + @Column(name = "cluster") + private String cluster; + + @Basic + @Index + @Column(name = "external_id") + private String externalID; + + @Basic + @Index + @Column(name = "instance_time") + private Timestamp instanceTime; + + @Basic + @Index + @NotNull + @Column(name = "creation_time") + private Timestamp creationTime; + + @Basic + @Column(name = "actual_start_time") + private Timestamp actualStartTime; + + @Basic + @Column(name = "actual_end_time") + private Timestamp actualEndTime; + + @Basic + @Index + @NotNull + @Column(name = "current_state") + private String currentState; + + @Basic + @Index + @NotNull + @Column(name = "instance_sequence") + private Integer instanceSequence; + + @ForeignKey(deleteAction= ForeignKeyAction.CASCADE) + @ManyToOne(cascade= CascadeType.REMOVE) + private EntityBean entityBean; + + + @Column(name = "awaited_predicates") + @Lob + private byte[] awaitedPredicates; + + @Column(name = "properties") + @Lob + private byte[] properties; + + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getExternalID() { + return externalID; + } + + public void setExternalID(String externalID) { + this.externalID = externalID; + } + + public Timestamp getInstanceTime() { + return instanceTime; + } + + public void setInstanceTime(Timestamp instanceTime) { + this.instanceTime = instanceTime; + } + + public Timestamp getCreationTime() { + return creationTime; + } + + public void setCreationTime(Timestamp creationTime) { + this.creationTime = creationTime; + } + + public Timestamp getActualStartTime() { + return actualStartTime; + } + + public void setActualStartTime(Timestamp actualStartTime) { + this.actualStartTime = actualStartTime; + } + + public Timestamp getActualEndTime() { + return actualEndTime; + } + + public void setActualEndTime(Timestamp actualEndTime) { + this.actualEndTime = actualEndTime; + } + + public String getCurrentState() { + return currentState; + } + + public void setCurrentState(String currentState) { + this.currentState = currentState; + } + + public byte[] getAwaitedPredicates() { + return awaitedPredicates; + } + + public void setAwaitedPredicates(byte[] awaitedPredicates) { + this.awaitedPredicates = awaitedPredicates; + } + + public Integer getInstanceSequence() { + return instanceSequence; + } + + public void setInstanceSequence(Integer instanceSequence) { + this.instanceSequence = instanceSequence; + } + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public byte[] getProperties() { + return properties; + } + + public void setProperties(byte[] properties) { + this.properties = properties; + } + + public EntityBean getEntityBean() { + return entityBean; + } + + public void setEntityBean(EntityBean entityBean) { + this.entityBean = entityBean; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java new file mode 100644 index 0000000..2b48569 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import javax.persistence.Entity; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Column; +import javax.persistence.Basic; +import javax.validation.constraints.NotNull; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** +* The Feeds that are to be monitered will be stored in the db. +* */ + +@Entity +@NamedQueries({ + @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from " + + "MonitoredFeedsBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean " + + "a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) " + + "from MonitoredFeedsBean a") +}) +@Table(name="MONITORED_FEEDS") +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class MonitoredFeedsBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "feed_name") + private String feedName; + + public String getFeedName() { + return feedName; + } + + public void setFeedName(String feedName) { + this.feedName = feedName; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java new file mode 100644 index 0000000..038244a --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import javax.persistence.Entity; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.GenerationType; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.validation.constraints.NotNull; +import java.util.Date; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** +* The instances of feed to be monitored will be stored in db. +* */ +@Entity +@NamedQueries({ + @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), + @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ") +}) +@Table(name = "PENDING_INSTANCES") +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class PendingInstanceBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "feed_name") + private String feedName; + + @Basic + @NotNull + @Column(name = "cluster_name") + private String clusterName; + + @Basic + @NotNull + @Column(name = "nominal_time") + private Date nominalTime; + + public Date getNominalTime() { + return nominalTime; + } + + public void setNominalTime(Date nominalTime) { + this.nominalTime = nominalTime; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getFeedName() { + return feedName; + } + + public void setFeedName(String feedName) { + this.feedName = feedName; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java new file mode 100644 index 0000000..511270e --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; +/** + * The name of queries to be used as constants accross the packages. + */ + +public final class PersistenceConstants { + private PersistenceConstants(){ + + } + public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE"; + public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES"; + public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS"; + public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; + public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; + public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED"; + public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; + public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java new file mode 100644 index 0000000..c368d2c --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.persistence; + +import org.apache.falcon.FalconException; + +/** + * Exception to be through by the bean classes. + */ +public class ResultNotFoundException extends FalconException { + + public ResultNotFoundException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java new file mode 100644 index 0000000..73fde33 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.InstanceBean; +import org.apache.falcon.util.StateStoreProperties; +import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import java.text.MessageFormat; +import java.util.Properties; + +/** + * Service that manages JPA. + */ +public final class FalconJPAService implements FalconService { + + private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class); + public static final String PREFIX = "falcon.statestore."; + + public static final String DB_SCHEMA = PREFIX + "schema.name"; + public static final String URL = PREFIX + "jdbc.url"; + public static final String DRIVER = PREFIX + "jdbc.driver"; + public static final String USERNAME = PREFIX + "jdbc.username"; + public static final String PASSWORD = PREFIX + "jdbc.password"; + public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source"; + public static final String CONN_PROPERTIES = PREFIX + "connection.properties"; + public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn"; + public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema"; + public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection"; + public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval"; + public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num"; + + private EntityManagerFactory entityManagerFactory; + // Persistent Unit which is defined in persistence.xml + private String persistenceUnit; + private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService(); + + private FalconJPAService() { + } + + public static FalconJPAService get() { + return FALCON_JPA_SERVICE; + } + + public EntityManagerFactory getEntityManagerFactory() { + return entityManagerFactory; + } + + public void setPersistenceUnit(String dbType) { + if (StringUtils.isEmpty(dbType)) { + throw new IllegalArgumentException(" DB type cannot be null or empty"); + } + dbType = dbType.split(":")[0]; + this.persistenceUnit = "falcon-" + dbType; + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public void init() throws FalconException { + Properties props = getPropsforStore(); + entityManagerFactory = Persistence. + createEntityManagerFactory(persistenceUnit, props); + EntityManager entityManager = getEntityManager(); + entityManager.find(EntityBean.class, 1); + entityManager.find(InstanceBean.class, 1); + LOG.info("All entities initialized"); + + // need to use a pseudo no-op transaction so all entities, datasource + // and connection pool are initialized one time only + entityManager.getTransaction().begin(); + OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory; + // Mask the password with '***' + String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); + LOG.info("JPA configuration: {0}", logMsg); + entityManager.getTransaction().commit(); + entityManager.close(); + } + + private Properties getPropsforStore() throws FalconException { + String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA); + String url = StateStoreProperties.get().getProperty(URL); + String driver = StateStoreProperties.get().getProperty(DRIVER); + String user = StateStoreProperties.get().getProperty(USERNAME); + String password = StateStoreProperties.get().getProperty(PASSWORD).trim(); + String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim(); + String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE); + String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES); + boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA, + "false")); + boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true")); + String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); + String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim(); + + if (!url.startsWith("jdbc:")) { + throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url); + } + String dbType = url.substring("jdbc:".length()); + if (dbType.indexOf(":") <= 0) { + throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url); + } + setPersistenceUnit(dbType); + String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; + connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); + Properties props = new Properties(); + if (autoSchemaCreation) { + connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; + props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); + } else if (validateDbConn) { + // validation can be done only if the schema already exist, else a + // connection cannot be obtained to create the schema. + String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; + String num = "numTestsPerEvictionRun=" + evictionNum; + connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; + connProps += ",ValidationQuery=select 1"; + connProps = MessageFormat.format(connProps, dbSchema); + } else { + connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; + } + if (connPropsConfig != null) { + connProps += "," + connPropsConfig; + } + props.setProperty("openjpa.ConnectionProperties", connProps); + props.setProperty("openjpa.ConnectionDriverName", dataSource); + return props; + } + + @Override + public void destroy() throws FalconException { + if (entityManagerFactory.isOpen()) { + entityManagerFactory.close(); + } + } + + + /** + * Return an EntityManager. Used by the StoreService. + * + * @return an entity manager + */ + public EntityManager getEntityManager() { + return getEntityManagerFactory().createEntityManager(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java new file mode 100644 index 0000000..df8194c --- /dev/null +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.tools; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.falcon.cli.CLIParser; +import org.apache.falcon.service.FalconJPAService; +import org.apache.falcon.util.BuildProperties; +import org.apache.falcon.util.StateStoreProperties; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Command Line utility for Table Creation, Update. + */ +public class FalconStateStoreDBCLI { + public static final String HELP_CMD = "help"; + public static final String VERSION_CMD = "version"; + public static final String CREATE_CMD = "create"; + public static final String SQL_FILE_OPT = "sqlfile"; + public static final String RUN_OPT = "run"; + public static final String UPGRADE_CMD = "upgrade"; + + // Represents whether DB instance exists or not. + private boolean instanceExists; + private static final String[] FALCON_HELP = + {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"}; + + public static void main(String[] args) { + new FalconStateStoreDBCLI().run(args); + } + + public FalconStateStoreDBCLI() { + instanceExists = false; + } + + protected Options getOptions() { + Option sqlfile = new Option(SQL_FILE_OPT, true, + "Generate SQL script instead of creating/upgrading the DB schema"); + Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade"); + Options options = new Options(); + options.addOption(sqlfile); + options.addOption(run); + return options; + } + + public synchronized int run(String[] args) { + if (instanceExists) { + throw new IllegalStateException("CLI instance already used"); + } + instanceExists = true; + + CLIParser parser = new CLIParser("falcondb", FALCON_HELP); + parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false); + parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false); + parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false); + parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false); + + try { + CLIParser.Command command = parser.parse(args); + if (command.getName().equals(HELP_CMD)) { + parser.showHelp(); + } else if (command.getName().equals(VERSION_CMD)) { + showVersion(); + } else { + if (!command.getCommandLine().hasOption(SQL_FILE_OPT) + && !command.getCommandLine().hasOption(RUN_OPT)) { + throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified"); + } + CommandLine commandLine = command.getCommandLine(); + String sqlFile = (commandLine.hasOption(SQL_FILE_OPT)) + ? commandLine.getOptionValue(SQL_FILE_OPT) + : File.createTempFile("falcondb-", ".sql").getAbsolutePath(); + boolean run = commandLine.hasOption(RUN_OPT); + if (command.getName().equals(CREATE_CMD)) { + createDB(sqlFile, run); + } else if (command.getName().equals(UPGRADE_CMD)) { + upgradeDB(sqlFile, run); + } + System.out.println("The SQL commands have been written to: " + sqlFile); + if (!run) { + System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option"); + } + } + return 0; + } catch (ParseException ex) { + System.err.println("Invalid sub-command: " + ex.getMessage()); + System.err.println(); + System.err.println(parser.shortHelp()); + return 1; + } catch (Exception ex) { + System.err.println(); + System.err.println("Error: " + ex.getMessage()); + System.err.println(); + System.err.println("Stack trace for the error was (for debug purposes):"); + System.err.println("--------------------------------------"); + ex.printStackTrace(System.err); + System.err.println("--------------------------------------"); + System.err.println(); + return 1; + } + } + + private void upgradeDB(String sqlFile, boolean run) throws Exception { + validateConnection(); + if (!checkDBExists()) { + throw new Exception("Falcon DB doesn't exist"); + } + String falconVersion = BuildProperties.get().getProperty("project.version"); + String dbVersion = getFalconDBVersion(); + if (dbVersion.compareTo(falconVersion) >= 0) { + System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'"); + return; + } + + createUpgradeDB(sqlFile, run, false); + upgradeFalconDBVersion(sqlFile, run, falconVersion); + + // any post upgrade tasks + if (run) { + System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'"); + } + } + + + private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception { + String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'"; + PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); + writer.println(); + writer.println(updateDBVersion); + writer.close(); + System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version); + if (run) { + Connection conn = createConnection(); + Statement st = null; + try { + conn.setAutoCommit(true); + st = conn.createStatement(); + st.executeUpdate(updateDBVersion); + st.close(); + } catch (Exception ex) { + throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + closeStatement(st); + conn.close(); + } + } + System.out.println("DONE"); + } + + private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'"; + + private String getFalconDBVersion() throws Exception { + String version; + System.out.println("Get Falcon DB version"); + Connection conn = createConnection(); + Statement st = null; + ResultSet rs = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(GET_FALCON_DB_VERSION); + if (rs.next()) { + version = rs.getString(1); + } else { + throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table"); + } + } catch (Exception ex) { + throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + System.out.println("DONE"); + return version; + } + + + private Map<String, String> getJdbcConf() throws Exception { + Map<String, String> jdbcConf = new HashMap<String, String>(); + jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER)); + String url = StateStoreProperties.get().getProperty(FalconJPAService.URL); + jdbcConf.put("url", url); + jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME)); + jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD)); + String dbType = url.substring("jdbc:".length()); + if (dbType.indexOf(":") <= 0) { + throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); + } + dbType = dbType.substring(0, dbType.indexOf(":")); + jdbcConf.put("dbtype", dbType); + return jdbcConf; + } + + private String[] createMappingToolArguments(String sqlFile) throws Exception { + Map<String, String> conf = getJdbcConf(); + List<String> args = new ArrayList<String>(); + args.add("-schemaAction"); + args.add("add"); + args.add("-p"); + args.add("persistence.xml#falcon-" + conf.get("dbtype")); + args.add("-connectionDriverName"); + args.add(conf.get("driver")); + args.add("-connectionURL"); + args.add(conf.get("url")); + args.add("-connectionUserName"); + args.add(conf.get("user")); + args.add("-connectionPassword"); + args.add(conf.get("password")); + if (sqlFile != null) { + args.add("-sqlFile"); + args.add(sqlFile); + } + args.add("-indexes"); + args.add("true"); + args.add("org.apache.falcon.persistence.EntityBean"); + args.add("org.apache.falcon.persistence.InstanceBean"); + args.add("org.apache.falcon.persistence.PendingInstanceBean"); + args.add("org.apache.falcon.persistence.MonitoredFeedsBean"); + return args.toArray(new String[args.size()]); + } + + private void createDB(String sqlFile, boolean run) throws Exception { + validateConnection(); + if (checkDBExists()) { + return; + } + + verifyFalconPropsTable(false); + createUpgradeDB(sqlFile, run, true); + createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version")); + if (run) { + System.out.println("Falcon DB has been created for Falcon version '" + + BuildProperties.get().getProperty("project.version") + "'"); + } + } + + private static final String CREATE_FALCON_DB_PROPS = + "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))"; + + private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception { + String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')"; + + PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); + writer.println(); + writer.println(CREATE_FALCON_DB_PROPS); + writer.println(insertDbVerion); + writer.close(); + System.out.println("Create FALCON_DB_PROPS table"); + if (run) { + Connection conn = createConnection(); + Statement st = null; + try { + conn.setAutoCommit(true); + st = conn.createStatement(); + st.executeUpdate(CREATE_FALCON_DB_PROPS); + st.executeUpdate(insertDbVerion); + st.close(); + } catch (Exception ex) { + closeStatement(st); + throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + conn.close(); + } + } + System.out.println("DONE"); + } + + private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS"; + + private boolean verifyFalconPropsTable(boolean exists) throws Exception { + System.out.println((exists) ? "Check FALCON_DB_PROPS table exists" + : "Checking FALCON_DB_PROPS table does not exist"); + boolean tableExists; + Connection conn = createConnection(); + Statement st = null; + ResultSet rs = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(FALCON_DB_PROPS_EXISTS); + rs.next(); + tableExists = true; + } catch (Exception ex) { + tableExists = false; + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + if (tableExists != exists) { + throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists")); + } + System.out.println("DONE"); + return tableExists; + } + + private void closeResultSet(ResultSet rs) { + try { + if (rs != null) { + rs.close(); + } + } catch (Exception e) { + System.out.println("Unable to close ResultSet " + rs); + } + } + + private void closeStatement(Statement st) throws Exception { + try { + if (st != null) { + st.close(); + } + } catch (Exception e) { + System.out.println("Unable to close SQL Statement " + st); + throw new Exception(e); + } + } + + private Connection createConnection() throws Exception { + Map<String, String> conf = getJdbcConf(); + Class.forName(conf.get("driver")).newInstance(); + return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password")); + } + + private void validateConnection() throws Exception { + System.out.println("Validating DB Connection"); + try { + createConnection().close(); + System.out.println("DONE"); + } catch (Exception ex) { + throw new Exception("Could not connect to the database: " + ex.toString(), ex); + } + } + + private static final String ENTITY_STATUS_QUERY = + "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')"; + private static final String INSTANCE_STATUS_QUERY = + "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')"; + + private boolean checkDBExists() throws Exception { + boolean schemaExists; + Connection conn = createConnection(); + ResultSet rs = null; + Statement st = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(ENTITY_STATUS_QUERY); + rs.next(); + schemaExists = true; + } catch (Exception ex) { + schemaExists = false; + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist")); + return schemaExists; + } + + private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception { + System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema"); + String[] args = createMappingToolArguments(sqlFile); + org.apache.openjpa.jdbc.meta.MappingTool.main(args); + if (run) { + args = createMappingToolArguments(null); + org.apache.openjpa.jdbc.meta.MappingTool.main(args); + } + System.out.println("DONE"); + } + + private void showVersion() throws Exception { + System.out.println("Falcon Server version: " + + BuildProperties.get().getProperty("project.version")); + validateConnection(); + if (!checkDBExists()) { + throw new Exception("Falcon DB doesn't exist"); + } + try { + verifyFalconPropsTable(true); + } catch (Exception ex) { + throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool"); + } + showFalconPropsInfo(); + } + + private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name"; + + private void showFalconPropsInfo() throws Exception { + Connection conn = createConnection(); + Statement st = null; + ResultSet rs = null; + try { + System.out.println("Falcon DB Version Information"); + System.out.println("--------------------------------------"); + st = conn.createStatement(); + rs = st.executeQuery(GET_FALCON_PROPS_INFO); + while (rs.next()) { + System.out.println(rs.getString(1) + ": " + rs.getString(2)); + } + System.out.println("--------------------------------------"); + } catch (Exception ex) { + throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml new file mode 100644 index 0000000..4c9388c --- /dev/null +++ b/common/src/main/resources/META-INF/persistence.xml @@ -0,0 +1,113 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<persistence xmlns="http://java.sun.com/xml/ns/persistence" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + version="1.0"> + + <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL"> + <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> + + <class>org.apache.falcon.persistence.EntityBean</class> + <class>org.apache.falcon.persistence.InstanceBean</class> + <class>org.apache.falcon.persistence.PendingInstanceBean</class> + <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> + + <properties> + <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + + <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> + + <property name="openjpa.MetaDataFactory" + value="jpa(Types=org.apache.falcon.persistence.EntityBean; + org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; + org.apache.falcon.persistence.MonitoredFeedsBean)"></property> + + <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> + <property name="openjpa.LockManager" value="pessimistic"/> + <property name="openjpa.ReadLockLevel" value="read"/> + <property name="openjpa.WriteLockLevel" value="write"/> + <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM--> + <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> + <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> + <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> + <property name="openjpa.Log" value="log4j"/> + </properties> + </persistence-unit> + + <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL"> + <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> + + <class>org.apache.falcon.persistence.EntityBean</class> + <class>org.apache.falcon.persistence.InstanceBean</class> + <class>org.apache.falcon.persistence.PendingInstanceBean</class> + <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> + + <properties> + <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + + <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> + + <property name="openjpa.MetaDataFactory" + value="jpa(Types=org.apache.falcon.persistence.EntityBean; + org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; + org.apache.falcon.persistence.MonitoredFeedsBean)"></property> + + <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> + <property name="openjpa.LockManager" value="pessimistic"/> + <property name="openjpa.ReadLockLevel" value="read"/> + <property name="openjpa.WriteLockLevel" value="write"/> + <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM--> + <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> + <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> + <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> + <property name="openjpa.Log" value="log4j"/> + </properties> + </persistence-unit> + + <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL"> + <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> + + <class>org.apache.falcon.persistence.EntityBean</class> + <class>org.apache.falcon.persistence.InstanceBean</class> + <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> + <class>org.apache.falcon.persistence.PendingInstanceBean</class> + + <properties> + <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + + <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> + + <property name="openjpa.MetaDataFactory" + value="jpa(Types=org.apache.falcon.persistence.EntityBean; + org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; + org.apache.falcon.persistence.MonitoredFeedsBean)"></property> + + <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> + <property name="openjpa.LockManager" value="pessimistic"/> + <property name="openjpa.ReadLockLevel" value="read"/> + <property name="openjpa.WriteLockLevel" value="write"/> + <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM--> + <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> + <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> + <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> + <property name="openjpa.Log" value="log4j"/> + </properties> + </persistence-unit> + +</persistence> http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 81d3da1..87a74bf 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -42,7 +42,8 @@ org.apache.falcon.metadata.MetadataMappingService,\ org.apache.falcon.service.LogCleanupService,\ org.apache.falcon.service.GroupsService,\ - org.apache.falcon.service.ProxyUserService + org.apache.falcon.service.ProxyUserService,\ + org.apache.falcon.service.FalconJPAService ## Add if you want to use Falcon Azure integration ## # org.apache.falcon.adfservice.ADFProviderService ## If you wish to use Falcon native scheduler add the commented out services below to application.services ## @@ -51,7 +52,7 @@ # org.apache.falcon.notification.service.impl.AlarmService,\ # org.apache.falcon.notification.service.impl.DataAvailabilityService,\ # org.apache.falcon.execution.FalconExecutionService,\ -# org.apache.falcon.state.store.service.FalconJPAService + # List of Lifecycle policies configured. @@ -305,4 +306,4 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle ## Creates Falcon DB. ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -#*.falcon.statestore.create.db.schema=true \ No newline at end of file +#*.falcon.statestore.create.db.schema=true http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.credentials ---------------------------------------------------------------------- diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials index 86c32a1..b0e4196 100644 --- a/common/src/main/resources/statestore.credentials +++ b/common/src/main/resources/statestore.credentials @@ -18,5 +18,5 @@ ######### StateStore Credentials ##### -#*.falcon.statestore.jdbc.username=sa -#*.falcon.statestore.jdbc.password= \ No newline at end of file +*.falcon.statestore.jdbc.username=sa +*.falcon.statestore.jdbc.password= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties index 44e79b3..7686426 100644 --- a/common/src/main/resources/statestore.properties +++ b/common/src/main/resources/statestore.properties @@ -42,4 +42,22 @@ ## Creates Falcon DB. ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -#*.falcon.statestore.create.db.schema=true \ No newline at end of file +#*.falcon.statestore.create.db.schema=true + + +######## StateStore Properties ##### +*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true +*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +# Maximum number of active connections that can be allocated from this pool at the same time. +*.falcon.statestore.pool.max.active.conn=10 +*.falcon.statestore.connection.properties= +# Indicates the interval (in milliseconds) between eviction runs. +*.falcon.statestore.validate.db.connection.eviction.interval=300000 +# The number of objects to examine during each run of the idle object evictor thread. +*.falcon.statestore.validate.db.connection.eviction.num=10 +# Creates Falcon DB. +# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +*.falcon.statestore.create.db.schema=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/docs/src/site/twiki/FalconNativeScheduler.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki index 9ffc5e9..1f51739 100644 --- a/docs/src/site/twiki/FalconNativeScheduler.twiki +++ b/docs/src/site/twiki/FalconNativeScheduler.twiki @@ -29,7 +29,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.service.FeedSLAMonitoringService,\ org.apache.falcon.service.LifecyclePolicyMap,\ - org.apache.falcon.state.store.service.FalconJPAService,\ + org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java new file mode 100644 index 0000000..39e2562 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.jdbc; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.falcon.persistence.MonitoredFeedsBean; +import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.persistence.PersistenceConstants; +import org.apache.falcon.persistence.ResultNotFoundException; +import org.apache.falcon.service.FalconJPAService; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.Date; +import java.util.List; + +/** +* StateStore for MonitoringFeeds and PendingFeedInstances. +*/ + +public class MonitoringJdbcStateStore { + + private EntityManager getEntityManager() { + return FalconJPAService.get().getEntityManager(); + } + + + public void putMonitoredFeed(String feedName){ + + MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean(); + monitoredFeedsBean.setFeedName(feedName); + EntityManager entityManager = getEntityManager(); + try { + beginTransaction(entityManager); + entityManager.persist(monitoredFeedsBean); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public MonitoredFeedsBean getMonitoredFeed(String feedName){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); + q.setParameter("feedName", feedName); + List result = q.getResultList(); + try { + if (result.isEmpty()) { + return null; + } + } finally { + entityManager.close(); + } + return ((MonitoredFeedsBean)result.get(0)); + } + + public void deleteMonitoringFeed(String feedName) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); + q.setParameter("feedName", feedName); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException{ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); + List result = q.getResultList(); + try{ + if (result.isEmpty()) { + throw new ResultNotFoundException("No Feed has been scheduled for monitoring."); + } + } finally { + entityManager.close(); + } + return result; + } + + public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){ + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + q.setParameter("nominalTime", nominalTime); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public void deletePendingInstances(String feedName, String clusterName){ + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public void putPendingInstances(String feed, String clusterName, Date nominalTime){ + EntityManager entityManager = getEntityManager(); + PendingInstanceBean pendingInstanceBean = new PendingInstanceBean(); + pendingInstanceBean.setFeedName(feed); + pendingInstanceBean.setClusterName(clusterName); + pendingInstanceBean.setNominalTime(nominalTime); + + beginTransaction(entityManager); + entityManager.persist(pendingInstanceBean); + commitAndCloseTransaction(entityManager); + } + + public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + List result = q.getResultList(); + try{ + if (CollectionUtils.isEmpty(result)) { + throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found"); + } + } finally { + entityManager.close(); + } + return result; + } + public List<PendingInstanceBean> getAllInstances(){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); + List result = q.getResultList(); + + try { + if (CollectionUtils.isEmpty(result)) { + return null; + } + } finally{ + entityManager.close(); + } + return result; + } + + private void commitAndCloseTransaction(EntityManager entityManager) { + entityManager.getTransaction().commit(); + entityManager.close(); + } + + private void beginTransaction(EntityManager entityManager) { + entityManager.getTransaction().begin(); + } + +}
