http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 29bd7ba..b5a2569 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -17,7 +17,7 @@ */ package org.apache.falcon.service; -import org.apache.commons.io.IOUtils; +import org.apache.commons.collections.CollectionUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; import org.apache.falcon.entity.EntityUtil; @@ -31,6 +31,9 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Sla; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.persistence.MonitoredFeedsBean; +import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; @@ -38,24 +41,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.ArrayList; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -66,6 +60,8 @@ import java.util.concurrent.TimeUnit; public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); + private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); + private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); private static final int ONE_MS = 1; @@ -88,29 +84,10 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); /** - * Feeds to be monitored. - */ - protected Set<String> monitoredFeeds; - - - /** - * Map<Pair<feedName, clusterName>, Set<instanceTime> to store - * each missing instance of a feed. - */ - protected Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances; - - - /** * Used to store the last time when pending instances were checked for SLA. */ private Date lastCheckedAt; - - /** - * Used to store last time when the state was serialized to the store. - */ - private Date lastSerializedAt; - /** * Frequency in seconds of "status check" for pending feed instances. */ @@ -156,7 +133,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen if (currentClusters.contains(cluster.getName())) { if (FeedHelper.getSLA(cluster, feed) != null) { LOG.debug("Adding feed:{} for monitoring", feed.getName()); - monitoredFeeds.add(feed.getName()); + MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName()); } } } @@ -173,8 +150,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { - monitoredFeeds.remove(feed.getName()); - pendingInstances.remove(new Pair<>(feed.getName(), cluster.getName())); + MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName()); + MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName()); } } } @@ -212,7 +189,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } for (String clusterName : slaRemovedClusters) { - pendingInstances.remove(new Pair<>(newFeed.getName(), clusterName)); + MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName); } } } @@ -247,32 +224,21 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288"); queueSize = Integer.parseInt(size); - try { - if (fileSystem.exists(filePath)) { - deserialize(filePath); - } else { - LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); - initializeService(); - } - } catch (IOException e) { - throw new FalconException("Couldn't check the existence of " + filePath, e); - } + LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); + initializeService(); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); } - @Override - public void destroy() throws FalconException { - serializeState(); // store the state of monitoring service to the disk. - } - - public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) { + public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) + throws FalconException { LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, clusterName, nominalTime); - Pair<String, String> feedCluster = new Pair<>(feedName, clusterName); + List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName)); // Slas for feeds not having sla tag are not stored. - if (pendingInstances.get(feedCluster) != null) { - pendingInstances.get(feedCluster).remove(nominalTime); + if (CollectionUtils.isEmpty(instances)){ + MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime); } } @@ -290,13 +256,17 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } } + @Override + public void destroy() throws FalconException { + } + //Periodically update status of pending instances, add new instances and take backup. private class Monitor implements Runnable { @Override public void run() { try { - if (!monitoredFeeds.isEmpty()) { + if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) { checkPendingInstanceAvailability(); // add Instances from last checked time to 10 minutes from now(some buffer for status check) @@ -304,12 +274,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); addNewPendingFeedInstances(lastCheckedAt, newCheckPoint); lastCheckedAt = newCheckPoint; - - //take backup - if (now.getTime() - lastSerializedAt.getTime() > serializationFrequencyMillis) { - serializeState(); - lastSerializedAt = new Date(); - } } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -320,14 +284,18 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen void addNewPendingFeedInstances(Date from, Date to) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); - for (String feedName : monitoredFeeds) { + List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed(); + for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) { + String feedName = monitoredFeedsBean.getFeedName(); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); for (Cluster feedCluster : feed.getClusters().getClusters()) { if (currentClusters.contains(feedCluster.getName())) { Date nextInstanceTime = from; Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName()); - BlockingQueue<Date> instances = pendingInstances.get(key); - if (instances == null) { + BlockingQueue<Date> instances = new LinkedBlockingQueue<>( + MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName())); + if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, + feedCluster.getName()))) { instances = new LinkedBlockingQueue<>(queueSize); Date feedStartTime = feedCluster.getValidity().getStart(); Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster); @@ -357,7 +325,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); } - pendingInstances.put(key, instances); + for(Date date:instances){ + MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date); + } } } } @@ -368,11 +338,14 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen * Checks the availability of all the pendingInstances and removes the ones which have become available. */ private void checkPendingInstanceAvailability() throws FalconException { - for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry: pendingInstances.entrySet()) { - for (Date date : entry.getValue()) { - boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date); + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName())) { + boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName(), date); if (status) { - pendingInstances.get(entry.getKey()).remove(date); + MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName(), date); } } } @@ -402,79 +375,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen return false; } - private void serializeState() throws FalconException{ - LOG.info("Saving context to: [{}]", storePath); - - //create a temporary file and rename it. - Path tmp = new Path(storePath , "tmp"); - ObjectOutputStream oos = null; - try { - OutputStream out = fileSystem.create(tmp); - oos = new ObjectOutputStream(out); - Map<String, Object> state = new HashMap<>(); - state.put("lastSerializedAt", lastSerializedAt.getTime()); - state.put("lastCheckedAt", lastCheckedAt.getTime()); - state.put("pendingInstances", pendingInstances); - oos.writeObject(state); - fileSystem.rename(tmp, filePath); - } catch (IOException e) { - throw new FalconException("Error serializing context to : " + storePath.toUri(), e); - } finally { - IOUtils.closeQuietly(oos); - } - } - - @SuppressWarnings("unchecked") - private void deserialize(Path path) throws FalconException { - try { - Map<String, Object> state = deserializeInternal(path); - pendingInstances = new ConcurrentHashMap<>(); - Map<Pair<String, String>, BlockingQueue<Date>> pendingInstancesCopy = - (Map<Pair<String, String>, BlockingQueue<Date>>) state.get("pendingInstances"); - // queue size can change during restarts, hence copy - for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : pendingInstancesCopy.entrySet()) { - BlockingQueue<Date> value = new LinkedBlockingQueue<>(queueSize); - BlockingQueue<Date> oldValue = entry.getValue(); - LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize); - while (!oldValue.isEmpty()) { - Date instance = oldValue.remove(); - if (value.size() == queueSize) { // if full - LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", value.peek(), - entry.getKey()); - value.remove(); - } - LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), instance); - value.add(instance); - } - pendingInstances.put(entry.getKey(), value); - } - lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); - lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); - monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities. - LOG.debug("Restored the service from old state."); - } catch (IOException | ClassNotFoundException e) { - throw new FalconException("Couldn't deserialize the old state", e); - } - } protected void initializeService() { - pendingInstances = new ConcurrentHashMap<>(); lastCheckedAt = new Date(); - lastSerializedAt = new Date(); - monitoredFeeds = new ConcurrentHashSet<>(); - } - - @SuppressWarnings("unchecked") - private Map<String, Object> deserializeInternal(Path path) throws IOException, ClassNotFoundException { - Map<String, Object> state; - InputStream in = fileSystem.open(path); - ObjectInputStream ois = new ObjectInputStream(in); - try { - state = (Map<String, Object>) ois.readObject(); - } finally { - IOUtils.closeQuietly(ois); - } - return state; } /** @@ -492,13 +395,16 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end) throws FalconException { Set<SchedulableEntityInstance> result = new HashSet<>(); - for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> feedInstances : pendingInstances.entrySet()) { - Pair<String, String> feedClusterPair = feedInstances.getKey(); + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName()); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed); if (sla != null) { - Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, feedInstances.getValue()); + Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, + new LinkedBlockingQueue<Date>(MONITORING_JDBC_STATE_STORE.getNominalInstances( + pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName()))); for (Pair<Date, String> status : slaStatus){ SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, feedClusterPair.second, status.first, EntityType.FEED); @@ -525,7 +431,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Set<SchedulableEntityInstance> result = new HashSet<>(); Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName); - BlockingQueue<Date> missingInstances = pendingInstances.get(feedClusterPair); + BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE. + getNominalInstances(feedName, clusterName)); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed);
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java new file mode 100644 index 0000000..aa32167 --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.jdbc; + +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.service.FalconJPAService; +import org.apache.falcon.tools.FalconStateStoreDBCLI; +import org.apache.falcon.util.StateStoreProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.util.Date; +import java.util.Random; + +/** +*Unit test for MonitoringJdbcStateStore. + * */ + +public class MonitoringJdbcStateStoreTest extends AbstractTestBase { + private static final String DB_BASE_DIR = "target/test-data/persistancedb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected LocalFileSystem fs = new LocalFileSystem(); + + private static Random randomValGenerator = new Random(); + private static FalconJPAService falconJPAService = FalconJPAService.get(); + + protected int execDBCLICommands(String[] args) { + return new FalconStateStoreDBCLI().run(args); + } + + public void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + } + + @BeforeClass + public void setup() throws Exception{ + StateStoreProperties.get().setProperty(FalconJPAService.URL, url); + Configuration localConf = new Configuration(); + fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + fs.mkdirs(new Path(DB_BASE_DIR)); + createDB(DB_SQL_FILE); + falconJPAService.init(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + } + + @Test + public void testInsertRetrieveAndUpdate() throws Exception { + + MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore(); + monitoringJdbcStateStore.putMonitoredFeed("test_feed1"); + monitoringJdbcStateStore.putMonitoredFeed("test_feed2"); + Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName()); + Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2); + + monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1"); + monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2"); + Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); + monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne); + monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo); + + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2); + monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne); + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1); + monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster"); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index 90eec4d..b739037 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -121,40 +121,6 @@ public class FeedSLAMonitoringTest extends AbstractTestBase { AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); } - @Test - public void testMakeFeedInstanceAvailable() { - Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); - Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); - Pair<String, String> feedCluster = new Pair<>("testFeed", "testCluster"); - - BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(); - missingInstances.add(instanceDate); - missingInstances.add(nextInstanceDate); - - FeedSLAMonitoringService.get().initializeService(); - FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances); - FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate); - - Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); - } - - @Test - public void testEndDateCheck() throws Exception { - Cluster cluster = publishCluster(); - publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC"); - Pair<String, String> feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME); - - FeedSLAMonitoringService service = FeedSLAMonitoringService.get(); - service.initializeService(); - service.queueSize = 100; - service.monitoredFeeds.add(FEED_NAME); - Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); - Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z"); - service.addNewPendingFeedInstances(from, to); - // check that instances after feed's end date are not added. - Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5); - } - private Cluster publishCluster() throws FalconException { Cluster cluster = new Cluster(); cluster.setName(CLUSTER_NAME); http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/pom.xml ---------------------------------------------------------------------- diff --git a/scheduler/pom.xml b/scheduler/pom.xml index dc006a1..6cb1c0d 100644 --- a/scheduler/pom.xml +++ b/scheduler/pom.xml @@ -95,26 +95,6 @@ </dependency> <dependency> - <groupId>org.apache.openjpa</groupId> - <artifactId>openjpa-jdbc</artifactId> - <version>${openjpa.version}</version> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.openjpa</groupId> - <artifactId>openjpa-persistence-jdbc</artifactId> - <version>${openjpa.version}</version> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>javax.validation</groupId> - <artifactId>validation-api</artifactId> - <version>${javax-validation.version}</version> - </dependency> - - <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> </dependency> @@ -169,27 +149,7 @@ </configuration> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.8</version> - <executions> - <execution> - <phase>process-classes</phase> - <configuration> - <tasks> - <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/> - <openjpac> - <classpath refid="maven.compile.classpath"/> - </openjpac> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java index 194819e..3384186 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java @@ -27,6 +27,8 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; import org.apache.falcon.execution.ProcessExecutionInstance; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.InstanceBean; import org.apache.falcon.predicate.Predicate; import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java deleted file mode 100644 index 37fb0cb..0000000 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.state.store.jdbc; - -import org.apache.openjpa.persistence.jdbc.Index; - -import javax.persistence.Basic; -import javax.persistence.CascadeType; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.OneToMany; -import javax.persistence.Table; -import javax.validation.constraints.NotNull; -import java.util.List; -//SUSPEND CHECKSTYLE CHECK LineLengthCheck -/** - * Entity object which will be stored in Data Base. - */ -@Entity -@NamedQueries({ - @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"), - @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"), - @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"), - @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"), - @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"), - @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"), - @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")}) -//RESUME CHECKSTYLE CHECK LineLengthCheck -@Table(name = "ENTITIES") -public class EntityBean { - @NotNull - @Id - private String id; - - @Basic - @NotNull - @Column(name = "name") - private String name; - - - @Basic - @Index - @NotNull - @Column(name = "type") - private String type; - - @Basic - @Index - @NotNull - @Column(name = "current_state") - private String state; - - @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean") - private List<InstanceBean> instanceBeans; - - public EntityBean() { - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getState() { - return state; - } - - public void setState(String state) { - this.state = state; - } - - public List<InstanceBean> getInstanceBeans() { - return instanceBeans; - } - - public void setInstanceBeans(List<InstanceBean> instanceBeans) { - this.instanceBeans = instanceBeans; - } -} - http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java deleted file mode 100644 index dffb116..0000000 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.state.store.jdbc; - -import org.apache.openjpa.persistence.jdbc.ForeignKey; -import org.apache.openjpa.persistence.jdbc.ForeignKeyAction; -import org.apache.openjpa.persistence.jdbc.Index; - -import javax.persistence.Basic; -import javax.persistence.CascadeType; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Lob; -import javax.persistence.ManyToOne; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.Table; -import javax.validation.constraints.NotNull; -import java.sql.Timestamp; - -//SUSPEND CHECKSTYLE CHECK LineLengthCheck -/** - * Instance State which will be stored in DB. - */ -@Entity -@NamedQueries({ - @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"), - @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"), - @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"), - @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"), - @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"), - @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"), - @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"), - @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"), - @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"), - @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"), - @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"), - @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState") -}) -//RESUME CHECKSTYLE CHECK LineLengthCheck -@Table(name = "INSTANCES") -public class InstanceBean { - - @Id - @NotNull - private String id; - - @Basic - @Index - @NotNull - @Column(name = "entity_id") - private String entityId; - - @Basic - @Index - @NotNull - @Column(name = "cluster") - private String cluster; - - @Basic - @Index - @Column(name = "external_id") - private String externalID; - - @Basic - @Index - @Column(name = "instance_time") - private Timestamp instanceTime; - - @Basic - @Index - @NotNull - @Column(name = "creation_time") - private Timestamp creationTime; - - @Basic - @Column(name = "actual_start_time") - private Timestamp actualStartTime; - - @Basic - @Column(name = "actual_end_time") - private Timestamp actualEndTime; - - @Basic - @Index - @NotNull - @Column(name = "current_state") - private String currentState; - - @Basic - @Index - @NotNull - @Column(name = "instance_sequence") - private Integer instanceSequence; - - @ForeignKey(deleteAction= ForeignKeyAction.CASCADE) - @ManyToOne(cascade= CascadeType.REMOVE) - private EntityBean entityBean; - - - @Column(name = "awaited_predicates") - @Lob - private byte[] awaitedPredicates; - - @Column(name = "properties") - @Lob - private byte[] properties; - - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getCluster() { - return cluster; - } - - public void setCluster(String cluster) { - this.cluster = cluster; - } - - public String getExternalID() { - return externalID; - } - - public void setExternalID(String externalID) { - this.externalID = externalID; - } - - public Timestamp getInstanceTime() { - return instanceTime; - } - - public void setInstanceTime(Timestamp instanceTime) { - this.instanceTime = instanceTime; - } - - public Timestamp getCreationTime() { - return creationTime; - } - - public void setCreationTime(Timestamp creationTime) { - this.creationTime = creationTime; - } - - public Timestamp getActualStartTime() { - return actualStartTime; - } - - public void setActualStartTime(Timestamp actualStartTime) { - this.actualStartTime = actualStartTime; - } - - public Timestamp getActualEndTime() { - return actualEndTime; - } - - public void setActualEndTime(Timestamp actualEndTime) { - this.actualEndTime = actualEndTime; - } - - public String getCurrentState() { - return currentState; - } - - public void setCurrentState(String currentState) { - this.currentState = currentState; - } - - public byte[] getAwaitedPredicates() { - return awaitedPredicates; - } - - public void setAwaitedPredicates(byte[] awaitedPredicates) { - this.awaitedPredicates = awaitedPredicates; - } - - public Integer getInstanceSequence() { - return instanceSequence; - } - - public void setInstanceSequence(Integer instanceSequence) { - this.instanceSequence = instanceSequence; - } - - public String getEntityId() { - return entityId; - } - - public void setEntityId(String entityId) { - this.entityId = entityId; - } - - public byte[] getProperties() { - return properties; - } - - public void setProperties(byte[] properties) { - this.properties = properties; - } - - public EntityBean getEntityBean() { - return entityBean; - } - - public void setEntityBean(EntityBean entityBean) { - this.entityBean = entityBean; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java index 1c07286..d2bb8c8 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java +++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java @@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.persistence.EntityBean; +import org.apache.falcon.persistence.InstanceBean; import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; @@ -30,7 +32,7 @@ import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.StateStoreProperties; import org.joda.time.DateTime; http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java deleted file mode 100644 index 027a8ef..0000000 --- a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.state.store.service; - -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.state.store.jdbc.EntityBean; -import org.apache.falcon.state.store.jdbc.InstanceBean; -import org.apache.falcon.util.StateStoreProperties; -import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.persistence.EntityManager; -import javax.persistence.EntityManagerFactory; -import javax.persistence.Persistence; -import java.text.MessageFormat; -import java.util.Properties; - -/** - * Service that manages JPA. - */ -public final class FalconJPAService implements FalconService { - - private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class); - public static final String PREFIX = "falcon.statestore."; - - public static final String DB_SCHEMA = PREFIX + "schema.name"; - public static final String URL = PREFIX + "jdbc.url"; - public static final String DRIVER = PREFIX + "jdbc.driver"; - public static final String USERNAME = PREFIX + "jdbc.username"; - public static final String PASSWORD = PREFIX + "jdbc.password"; - public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source"; - public static final String CONN_PROPERTIES = PREFIX + "connection.properties"; - public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn"; - public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema"; - public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection"; - public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval"; - public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num"; - - private EntityManagerFactory entityManagerFactory; - // Persistent Unit which is defined in persistence.xml - private String persistenceUnit; - private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService(); - - private FalconJPAService() { - } - - public static FalconJPAService get() { - return FALCON_JPA_SERVICE; - } - - public EntityManagerFactory getEntityManagerFactory() { - return entityManagerFactory; - } - - public void setPersistenceUnit(String dbType) { - if (StringUtils.isEmpty(dbType)) { - throw new IllegalArgumentException(" DB type cannot be null or empty"); - } - dbType = dbType.split(":")[0]; - this.persistenceUnit = "falcon-" + dbType; - } - - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - - @Override - public void init() throws FalconException { - Properties props = getPropsforStore(); - entityManagerFactory = Persistence. - createEntityManagerFactory(persistenceUnit, props); - EntityManager entityManager = getEntityManager(); - entityManager.find(EntityBean.class, 1); - entityManager.find(InstanceBean.class, 1); - LOG.info("All entities initialized"); - - // need to use a pseudo no-op transaction so all entities, datasource - // and connection pool are initialized one time only - entityManager.getTransaction().begin(); - OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory; - // Mask the password with '***' - String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); - LOG.info("JPA configuration: {0}", logMsg); - entityManager.getTransaction().commit(); - entityManager.close(); - } - - private Properties getPropsforStore() throws FalconException { - String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA); - String url = StateStoreProperties.get().getProperty(URL); - String driver = StateStoreProperties.get().getProperty(DRIVER); - String user = StateStoreProperties.get().getProperty(USERNAME); - String password = StateStoreProperties.get().getProperty(PASSWORD).trim(); - String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim(); - String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE); - String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES); - boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA, - "false")); - boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true")); - String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); - String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim(); - - if (!url.startsWith("jdbc:")) { - throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url); - } - String dbType = url.substring("jdbc:".length()); - if (dbType.indexOf(":") <= 0) { - throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url); - } - setPersistenceUnit(dbType); - String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; - connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); - Properties props = new Properties(); - if (autoSchemaCreation) { - connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; - props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); - } else if (validateDbConn) { - // validation can be done only if the schema already exist, else a - // connection cannot be obtained to create the schema. - String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; - String num = "numTestsPerEvictionRun=" + evictionNum; - connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; - connProps += ",ValidationQuery=select 1"; - connProps = MessageFormat.format(connProps, dbSchema); - } else { - connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; - } - if (connPropsConfig != null) { - connProps += "," + connPropsConfig; - } - props.setProperty("openjpa.ConnectionProperties", connProps); - props.setProperty("openjpa.ConnectionDriverName", dataSource); - return props; - } - - @Override - public void destroy() throws FalconException { - if (entityManagerFactory.isOpen()) { - entityManagerFactory.close(); - } - } - - - /** - * Return an EntityManager. Used by the StoreService. - * - * @return an entity manager - */ - public EntityManager getEntityManager() { - return getEntityManagerFactory().createEntityManager(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java deleted file mode 100644 index 6de9f7d..0000000 --- a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ /dev/null @@ -1,436 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.tools; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.falcon.cli.CLIParser; -import org.apache.falcon.state.store.service.FalconJPAService; -import org.apache.falcon.util.BuildProperties; -import org.apache.falcon.util.StateStoreProperties; - -import java.io.File; -import java.io.FileWriter; -import java.io.PrintWriter; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Command Line utility for Table Creation, Update. - */ -public class FalconStateStoreDBCLI { - public static final String HELP_CMD = "help"; - public static final String VERSION_CMD = "version"; - public static final String CREATE_CMD = "create"; - public static final String SQL_FILE_OPT = "sqlfile"; - public static final String RUN_OPT = "run"; - public static final String UPGRADE_CMD = "upgrade"; - - // Represents whether DB instance exists or not. - private boolean instanceExists; - private static final String[] FALCON_HELP = - {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"}; - - public static void main(String[] args) { - new FalconStateStoreDBCLI().run(args); - } - - public FalconStateStoreDBCLI() { - instanceExists = false; - } - - protected Options getOptions() { - Option sqlfile = new Option(SQL_FILE_OPT, true, - "Generate SQL script instead of creating/upgrading the DB schema"); - Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade"); - Options options = new Options(); - options.addOption(sqlfile); - options.addOption(run); - return options; - } - - public synchronized int run(String[] args) { - if (instanceExists) { - throw new IllegalStateException("CLI instance already used"); - } - instanceExists = true; - - CLIParser parser = new CLIParser("falcondb", FALCON_HELP); - parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false); - parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false); - parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false); - parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false); - - try { - CLIParser.Command command = parser.parse(args); - if (command.getName().equals(HELP_CMD)) { - parser.showHelp(); - } else if (command.getName().equals(VERSION_CMD)) { - showVersion(); - } else { - if (!command.getCommandLine().hasOption(SQL_FILE_OPT) - && !command.getCommandLine().hasOption(RUN_OPT)) { - throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified"); - } - CommandLine commandLine = command.getCommandLine(); - String sqlFile = (commandLine.hasOption(SQL_FILE_OPT)) - ? commandLine.getOptionValue(SQL_FILE_OPT) - : File.createTempFile("falcondb-", ".sql").getAbsolutePath(); - boolean run = commandLine.hasOption(RUN_OPT); - if (command.getName().equals(CREATE_CMD)) { - createDB(sqlFile, run); - } else if (command.getName().equals(UPGRADE_CMD)) { - upgradeDB(sqlFile, run); - } - System.out.println("The SQL commands have been written to: " + sqlFile); - if (!run) { - System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option"); - } - } - return 0; - } catch (ParseException ex) { - System.err.println("Invalid sub-command: " + ex.getMessage()); - System.err.println(); - System.err.println(parser.shortHelp()); - return 1; - } catch (Exception ex) { - System.err.println(); - System.err.println("Error: " + ex.getMessage()); - System.err.println(); - System.err.println("Stack trace for the error was (for debug purposes):"); - System.err.println("--------------------------------------"); - ex.printStackTrace(System.err); - System.err.println("--------------------------------------"); - System.err.println(); - return 1; - } - } - - private void upgradeDB(String sqlFile, boolean run) throws Exception { - validateConnection(); - if (!checkDBExists()) { - throw new Exception("Falcon DB doesn't exist"); - } - String falconVersion = BuildProperties.get().getProperty("project.version"); - String dbVersion = getFalconDBVersion(); - if (dbVersion.compareTo(falconVersion) >= 0) { - System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'"); - return; - } - - createUpgradeDB(sqlFile, run, false); - upgradeFalconDBVersion(sqlFile, run, falconVersion); - - // any post upgrade tasks - if (run) { - System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'"); - } - } - - - private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception { - String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'"; - PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); - writer.println(); - writer.println(updateDBVersion); - writer.close(); - System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version); - if (run) { - Connection conn = createConnection(); - Statement st = null; - try { - conn.setAutoCommit(true); - st = conn.createStatement(); - st.executeUpdate(updateDBVersion); - st.close(); - } catch (Exception ex) { - throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex); - } finally { - closeStatement(st); - conn.close(); - } - } - System.out.println("DONE"); - } - - private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'"; - - private String getFalconDBVersion() throws Exception { - String version; - System.out.println("Get Falcon DB version"); - Connection conn = createConnection(); - Statement st = null; - ResultSet rs = null; - try { - st = conn.createStatement(); - rs = st.executeQuery(GET_FALCON_DB_VERSION); - if (rs.next()) { - version = rs.getString(1); - } else { - throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table"); - } - } catch (Exception ex) { - throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex); - } finally { - closeResultSet(rs); - closeStatement(st); - conn.close(); - } - System.out.println("DONE"); - return version; - } - - - private Map<String, String> getJdbcConf() throws Exception { - Map<String, String> jdbcConf = new HashMap<String, String>(); - jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER)); - String url = StateStoreProperties.get().getProperty(FalconJPAService.URL); - jdbcConf.put("url", url); - jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME)); - jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD)); - String dbType = url.substring("jdbc:".length()); - if (dbType.indexOf(":") <= 0) { - throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); - } - dbType = dbType.substring(0, dbType.indexOf(":")); - jdbcConf.put("dbtype", dbType); - return jdbcConf; - } - - private String[] createMappingToolArguments(String sqlFile) throws Exception { - Map<String, String> conf = getJdbcConf(); - List<String> args = new ArrayList<String>(); - args.add("-schemaAction"); - args.add("add"); - args.add("-p"); - args.add("persistence.xml#falcon-" + conf.get("dbtype")); - args.add("-connectionDriverName"); - args.add(conf.get("driver")); - args.add("-connectionURL"); - args.add(conf.get("url")); - args.add("-connectionUserName"); - args.add(conf.get("user")); - args.add("-connectionPassword"); - args.add(conf.get("password")); - if (sqlFile != null) { - args.add("-sqlFile"); - args.add(sqlFile); - } - args.add("-indexes"); - args.add("true"); - args.add("org.apache.falcon.state.store.jdbc.EntityBean"); - args.add("org.apache.falcon.state.store.jdbc.InstanceBean"); - return args.toArray(new String[args.size()]); - } - - private void createDB(String sqlFile, boolean run) throws Exception { - validateConnection(); - if (checkDBExists()) { - return; - } - - verifyFalconPropsTable(false); - createUpgradeDB(sqlFile, run, true); - createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version")); - if (run) { - System.out.println("Falcon DB has been created for Falcon version '" - + BuildProperties.get().getProperty("project.version") + "'"); - } - } - - private static final String CREATE_FALCON_DB_PROPS = - "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))"; - - private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception { - String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')"; - - PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); - writer.println(); - writer.println(CREATE_FALCON_DB_PROPS); - writer.println(insertDbVerion); - writer.close(); - System.out.println("Create FALCON_DB_PROPS table"); - if (run) { - Connection conn = createConnection(); - Statement st = null; - try { - conn.setAutoCommit(true); - st = conn.createStatement(); - st.executeUpdate(CREATE_FALCON_DB_PROPS); - st.executeUpdate(insertDbVerion); - st.close(); - } catch (Exception ex) { - closeStatement(st); - throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex); - } finally { - conn.close(); - } - } - System.out.println("DONE"); - } - - private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS"; - - private boolean verifyFalconPropsTable(boolean exists) throws Exception { - System.out.println((exists) ? "Check FALCON_DB_PROPS table exists" - : "Checking FALCON_DB_PROPS table does not exist"); - boolean tableExists; - Connection conn = createConnection(); - Statement st = null; - ResultSet rs = null; - try { - st = conn.createStatement(); - rs = st.executeQuery(FALCON_DB_PROPS_EXISTS); - rs.next(); - tableExists = true; - } catch (Exception ex) { - tableExists = false; - } finally { - closeResultSet(rs); - closeStatement(st); - conn.close(); - } - if (tableExists != exists) { - throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists")); - } - System.out.println("DONE"); - return tableExists; - } - - private void closeResultSet(ResultSet rs) { - try { - if (rs != null) { - rs.close(); - } - } catch (Exception e) { - System.out.println("Unable to close ResultSet " + rs); - } - } - - private void closeStatement(Statement st) throws Exception { - try { - if (st != null) { - st.close(); - } - } catch (Exception e) { - System.out.println("Unable to close SQL Statement " + st); - throw new Exception(e); - } - } - - private Connection createConnection() throws Exception { - Map<String, String> conf = getJdbcConf(); - Class.forName(conf.get("driver")).newInstance(); - return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password")); - } - - private void validateConnection() throws Exception { - System.out.println("Validating DB Connection"); - try { - createConnection().close(); - System.out.println("DONE"); - } catch (Exception ex) { - throw new Exception("Could not connect to the database: " + ex.toString(), ex); - } - } - - private static final String ENTITY_STATUS_QUERY = - "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')"; - private static final String INSTANCE_STATUS_QUERY = - "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')"; - - private boolean checkDBExists() throws Exception { - boolean schemaExists; - Connection conn = createConnection(); - ResultSet rs = null; - Statement st = null; - try { - st = conn.createStatement(); - rs = st.executeQuery(ENTITY_STATUS_QUERY); - rs.next(); - schemaExists = true; - } catch (Exception ex) { - schemaExists = false; - } finally { - closeResultSet(rs); - closeStatement(st); - conn.close(); - } - System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist")); - return schemaExists; - } - - private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception { - System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema"); - String[] args = createMappingToolArguments(sqlFile); - org.apache.openjpa.jdbc.meta.MappingTool.main(args); - if (run) { - args = createMappingToolArguments(null); - org.apache.openjpa.jdbc.meta.MappingTool.main(args); - } - System.out.println("DONE"); - } - - private void showVersion() throws Exception { - System.out.println("Falcon Server version: " - + BuildProperties.get().getProperty("project.version")); - validateConnection(); - if (!checkDBExists()) { - throw new Exception("Falcon DB doesn't exist"); - } - try { - verifyFalconPropsTable(true); - } catch (Exception ex) { - throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool"); - } - showFalconPropsInfo(); - } - - private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name"; - - private void showFalconPropsInfo() throws Exception { - Connection conn = createConnection(); - Statement st = null; - ResultSet rs = null; - try { - System.out.println("Falcon DB Version Information"); - System.out.println("--------------------------------------"); - st = conn.createStatement(); - rs = st.executeQuery(GET_FALCON_PROPS_INFO); - while (rs.next()) { - System.out.println(rs.getString(1) + ": " + rs.getString(2)); - } - System.out.println("--------------------------------------"); - } catch (Exception ex) { - throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex); - } finally { - closeResultSet(rs); - closeStatement(st); - conn.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml deleted file mode 100644 index c2ef794..0000000 --- a/scheduler/src/main/resources/META-INF/persistence.xml +++ /dev/null @@ -1,104 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<persistence xmlns="http://java.sun.com/xml/ns/persistence" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - version="1.0"> - - <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL"> - <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> - - <class>org.apache.falcon.state.store.jdbc.EntityBean</class> - <class>org.apache.falcon.state.store.jdbc.InstanceBean</class> - - <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> - - <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> - - <property name="openjpa.MetaDataFactory" - value="jpa(Types=org.apache.falcon.state.store.EntityBean; - org.apache.falcon.state.store.InstanceBean)"></property> - - <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> - <property name="openjpa.LockManager" value="pessimistic"/> - <property name="openjpa.ReadLockLevel" value="read"/> - <property name="openjpa.WriteLockLevel" value="write"/> - <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM--> - <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> - <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> - <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> - <property name="openjpa.Log" value="log4j"/> - </properties> - </persistence-unit> - - <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL"> - <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> - - <class>org.apache.falcon.state.store.jdbc.EntityBean</class> - <class>org.apache.falcon.state.store.jdbc.InstanceBean</class> - - <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> - - <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> - - <property name="openjpa.MetaDataFactory" - value="jpa(Types=org.apache.falcon.state.store.EntityBean; - org.apache.falcon.state.store.InstanceBean)"></property> - - <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> - <property name="openjpa.LockManager" value="pessimistic"/> - <property name="openjpa.ReadLockLevel" value="read"/> - <property name="openjpa.WriteLockLevel" value="write"/> - <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM--> - <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> - <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> - <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> - <property name="openjpa.Log" value="log4j"/> - </properties> - </persistence-unit> - - <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL"> - <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> - - <class>org.apache.falcon.state.store.jdbc.EntityBean</class> - <class>org.apache.falcon.state.store.jdbc.InstanceBean</class> - - <properties> - <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> - - <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> - - <property name="openjpa.MetaDataFactory" - value="jpa(Types=org.apache.falcon.state.store.EntityBean; - org.apache.falcon.state.store.InstanceBean)"></property> - - <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> - <property name="openjpa.LockManager" value="pessimistic"/> - <property name="openjpa.ReadLockLevel" value="read"/> - <property name="openjpa.WriteLockLevel" value="write"/> - <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM--> - <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> - <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> - <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> - <property name="openjpa.Log" value="log4j"/> - </properties> - </persistence-unit> - -</persistence> http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index 417ec3e..437c5f5 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -45,7 +45,7 @@ import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java index 155be69..cd99049 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java +++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java @@ -18,7 +18,7 @@ package org.apache.falcon.state; import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.tools.FalconStateStoreDBCLI; import org.apache.falcon.util.StateStoreProperties; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java index ecd5293..3e186dd 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java @@ -19,7 +19,7 @@ package org.apache.falcon.state.service; import org.apache.falcon.FalconException; import org.apache.falcon.state.AbstractSchedulerTestBase; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java index d597e27..bf5c142 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -44,7 +44,7 @@ import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.jdbc.BeanMapperUtil; import org.apache.falcon.state.store.jdbc.JDBCStateStore; import org.apache.falcon.state.store.StateStore; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties index 7160bb2..6216b70 100644 --- a/scheduler/src/test/resources/startup.properties +++ b/scheduler/src/test/resources/startup.properties @@ -41,7 +41,7 @@ org.apache.falcon.notification.service.impl.AlarmService,\ org.apache.falcon.notification.service.impl.DataAvailabilityService,\ org.apache.falcon.execution.FalconExecutionService,\ - org.apache.falcon.state.store.service.FalconJPAService + org.apache.falcon.service.FalconJPAService ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ @@ -121,7 +121,8 @@ debug.libext.process.paths=${falcon.libext} *.falcon.http.authentication.simple.anonymous.allowed=false # Indicates the Kerberos principal to be used for HTTP endpoint. -# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. +# The principal MUST +#rt with 'HTTP/' as per Kerberos HTTP SPNEGO specification. *.falcon.http.authentication.kerberos.principal= # Location of the keytab file with the credentials for the HTTP principal. http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/resources/statestore.properties ---------------------------------------------------------------------- diff --git a/scheduler/src/test/resources/statestore.properties b/scheduler/src/test/resources/statestore.properties index 2ae642f..e7a08fc 100644 --- a/scheduler/src/test/resources/statestore.properties +++ b/scheduler/src/test/resources/statestore.properties @@ -18,7 +18,7 @@ *.domain=debug -######## StateStore Properties ##### +######### StateStore Properties ##### *.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore *.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver *.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/src/build/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index 78f2fd0..a6766df 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -38,16 +38,37 @@ </Match> <Match> - <Class name="org.apache.falcon.state.store.jdbc.EntityBean" /> + <Class name="org.apache.falcon.persistence.EntityBean" /> <Bug pattern="NP_BOOLEAN_RETURN_NULL" /> </Match> <Match> - <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" /> + <Class name="org.apache.falcon.persistence.InstanceBean" /> <Bug pattern="NP_BOOLEAN_RETURN_NULL" /> </Match> <Match> + <Class name="org.apache.falcon.persistence.PendingInstanceBean" /> + <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> + </Match> + + <!--<Match>--> + <!--<Class name="org.apache.falcon.persistence.PendingInstanceBean" />--> + <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />--> + <!--</Match>--> + + + <Match> + <Class name="org.apache.falcon.persistence.MonitoredFeedsBean" /> + <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> + </Match> + + <!--<Match>--> + <!--<Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />--> + <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />--> + <!--</Match>--> + + <Match> <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" /> </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index f23337b..3601e22 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -59,7 +59,7 @@ # org.apache.falcon.service.ProcessSubscriberService,\ # org.apache.falcon.service.FeedSLAMonitoringService,\ # org.apache.falcon.service.LifecyclePolicyMap,\ -# org.apache.falcon.state.store.service.FalconJPAService,\ +# org.apache.falcon.service.FalconJPAService,\ # org.apache.falcon.entity.store.ConfigurationStore,\ # org.apache.falcon.rerun.service.RetryService,\ # org.apache.falcon.rerun.service.LateRunService,\ http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/pom.xml ---------------------------------------------------------------------- diff --git a/unit/pom.xml b/unit/pom.xml index 7e5b073..f1ef463 100644 --- a/unit/pom.xml +++ b/unit/pom.xml @@ -44,6 +44,16 @@ <dependency> <groupId>org.apache.oozie</groupId> <artifactId>oozie-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-jdbc</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.openjpa</groupId> + <artifactId>openjpa-persistence</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties index 4576e0b..4dfea31 100644 --- a/unit/src/main/resources/startup.properties +++ b/unit/src/main/resources/startup.properties @@ -33,6 +33,7 @@ *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ org.apache.falcon.rerun.service.LateRunService,\ http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index aaf2b37..9b1ff2a 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -133,6 +133,7 @@ public class TestFalconUnit extends FalconUnitTestBase { ParseException, InterruptedException { // submit cluster and feeds submitClusterAndFeeds(); + APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH); assertStatus(result); createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME); http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java index 175833a..1bd4f45 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java @@ -24,7 +24,7 @@ import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.state.AbstractSchedulerTestBase; -import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.unit.FalconUnitTestBase; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.util.StateStoreProperties;
