Repository: falcon Updated Branches: refs/heads/master 08399d038 -> 14b1bb8f4
FALCON-1886 Feed sla monitoring does not work across restarts. Removed redundant fields. Added unit tests. Author: Ajay Yadava <[email protected]> Reviewers: Praveen Adlakha Closes #86 from ajayyadava/1886 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/14b1bb8f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/14b1bb8f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/14b1bb8f Branch: refs/heads/master Commit: 14b1bb8f4dc6cf5c85957ea0981cbaff0b490ff3 Parents: 08399d0 Author: Ajay Yadava <[email protected]> Authored: Wed Apr 6 15:55:18 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Wed Apr 6 15:55:18 2016 +0530 ---------------------------------------------------------------------- .../falcon/persistence/PendingInstanceBean.java | 11 +- .../persistence/PersistenceConstants.java | 1 + .../falcon/jdbc/MonitoringJdbcStateStore.java | 20 ++-- .../service/FeedSLAMonitoringService.java | 113 +++++++------------ .../jdbc/MonitoringJdbcStateStoreTest.java | 55 +++++++-- .../falcon/service/FeedSLAMonitoringTest.java | 6 +- 6 files changed, 113 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 038244a..108001d 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -35,11 +35,12 @@ import java.util.Date; * */ @Entity @NamedQueries({ - @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), - @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ") + @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), + @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), + @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ") }) @Table(name = "PENDING_INSTANCES") //RESUME CHECKSTYLE CHECK LineLengthCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index e554581..44edc7c 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -53,4 +53,5 @@ public final class PersistenceConstants { public static final String GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER"; public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE"; public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"; + public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 39e2562..6345d44 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -94,6 +94,15 @@ public class MonitoringJdbcStateStore { return result; } + public Date getLastInstanceTime(String feedName) throws ResultNotFoundException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class); + q.setParameter("feedName", feedName); + Date result = (Date)q.getSingleResult(); + entityManager.close(); + return result; + } + public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); @@ -133,21 +142,16 @@ public class MonitoringJdbcStateStore { commitAndCloseTransaction(entityManager); } - public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{ + public List<Date> getNominalInstances(String feedName, String clusterName) { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); q.setParameter("feedName", feedName); q.setParameter("clusterName", clusterName); List result = q.getResultList(); - try{ - if (CollectionUtils.isEmpty(result)) { - throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found"); - } - } finally { - entityManager.close(); - } + entityManager.close(); return result; } + public List<PendingInstanceBean> getAllInstances(){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index b5a2569..9de4463 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -17,6 +17,14 @@ */ package org.apache.falcon.service; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.collections.CollectionUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; @@ -35,24 +43,18 @@ import org.apache.falcon.jdbc.MonitoringJdbcStateStore; import org.apache.falcon.persistence.MonitoredFeedsBean; import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.falcon.util.DateUtil; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.ArrayList; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; /** * Service to monitor Feed SLAs. @@ -62,8 +64,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); - private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); - private static final int ONE_MS = 1; private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService(); @@ -76,17 +76,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen return SERVICE; } - protected int queueSize; - /** * Permissions for storePath. */ private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); - /** - * Used to store the last time when pending instances were checked for SLA. - */ - private Date lastCheckedAt; /** * Frequency in seconds of "status check" for pending feed instances. @@ -103,11 +97,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen /** - * Frequency in milliseconds of serializing(for backup) monitoring service's state. - */ - private int serializationFrequencyMillis; - - /** * Filesystem used for serializing and deserializing. */ private FileSystem fileSystem; @@ -212,21 +201,12 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen filePath = new Path(storePath, "feedSLAMonitoringService"); fileSystem = initializeFileSystem(); - String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis", ONE_HOUR); - serializationFrequencyMillis = Integer.parseInt(freq); - - freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); statusCheckFrequencySeconds = Integer.parseInt(freq); freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000"); lookAheadWindowMillis = Integer.parseInt(freq); - - String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288"); - queueSize = Integer.parseInt(size); - LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); - initializeService(); - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); } @@ -272,8 +252,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date now = new Date(); Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); - addNewPendingFeedInstances(lastCheckedAt, newCheckPoint); - lastCheckedAt = newCheckPoint; + addNewPendingFeedInstances(newCheckPoint); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -282,7 +261,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } - void addNewPendingFeedInstances(Date from, Date to) throws FalconException { + void addNewPendingFeedInstances(Date to) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed(); for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) { @@ -290,41 +269,27 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); for (Cluster feedCluster : feed.getClusters().getClusters()) { if (currentClusters.contains(feedCluster.getName())) { - Date nextInstanceTime = from; + // get start of instances from the database + Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName); Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName()); - BlockingQueue<Date> instances = new LinkedBlockingQueue<>( - MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName())); - if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, - feedCluster.getName()))) { - instances = new LinkedBlockingQueue<>(queueSize); - Date feedStartTime = feedCluster.getValidity().getStart(); - Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster); - ExpressionHelper evaluator = ExpressionHelper.get(); - ExpressionHelper.setReferenceDate(new Date()); - Date retention = new Date(evaluator.evaluate(retentionFrequency.toString(), Long.class)); - if (feedStartTime.before(retention)) { - feedStartTime = retention; - } - nextInstanceTime = feedStartTime; + if (nextInstanceTime == null) { + nextInstanceTime = getInitialStartTime(feed, feedCluster.getName()); + } else { + nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); } - Set<Date> exists = new HashSet<>(instances); + + Set<Date> instances = new HashSet<>(); org.apache.falcon.entity.v0.cluster.Cluster currentCluster = EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd(); while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) { - if (instances.size() >= queueSize) { // if no space, first make some space - LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key); - exists.remove(instances.peek()); - instances.remove(); - } LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key); - if (exists.add(nextInstanceTime)) { - instances.add(nextInstanceTime); - } + instances.add(nextInstanceTime); nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); } + for(Date date:instances){ MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date); } @@ -376,10 +341,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } - protected void initializeService() { - lastCheckedAt = new Date(); - } - /** * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed * slaLow or slaHigh. @@ -403,11 +364,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Sla sla = FeedHelper.getSLA(cluster, feed); if (sla != null) { Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, - new LinkedBlockingQueue<Date>(MONITORING_JDBC_STATE_STORE.getNominalInstances( - pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName()))); - for (Pair<Date, String> status : slaStatus){ + MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(), + pendingInstanceBean.getClusterName())); + for (Pair<Date, String> status : slaStatus) { SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, - feedClusterPair.second, status.first, EntityType.FEED); + feedClusterPair.second, status.first, EntityType.FEED); instance.setTags(status.second); result.add(instance); } @@ -431,8 +392,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Set<SchedulableEntityInstance> result = new HashSet<>(); Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName); - BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE. - getNominalInstances(feedName, clusterName)); + List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed); @@ -448,7 +408,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen return result; } - Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, BlockingQueue<Date> missingInstances) + Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances) throws FalconException { String tagCritical = "Missed SLA High"; String tagWarn = "Missed SLA Low"; @@ -473,4 +433,17 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } return result; } + + @VisibleForTesting + Date getInitialStartTime(Feed feed, String clusterName) throws FalconException { + Sla sla = FeedHelper.getSLA(clusterName, feed); + if (sla == null) { + throw new IllegalStateException("InitialStartTime can not be determined as the feed: " + + feed.getName() + " and cluster: " + clusterName + " does not have any sla"); + } + Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName); + Frequency slaLow = sla.getSlaLow(); + Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); + return startTime.before(slaTime) ? startTime : slaTime; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index aa32167..b43025d 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -17,6 +17,12 @@ */ package org.apache.falcon.jdbc; +import java.io.File; +import java.util.Date; + +import javax.persistence.EntityManager; +import javax.persistence.Query; + import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.v0.SchemaHelper; @@ -26,14 +32,12 @@ import org.apache.falcon.util.StateStoreProperties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; + import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.io.File; -import java.util.Date; -import java.util.Random; - /** *Unit test for MonitoringJdbcStateStore. * */ @@ -45,7 +49,7 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; protected LocalFileSystem fs = new LocalFileSystem(); - private static Random randomValGenerator = new Random(); + private static MonitoringJdbcStateStore monitoringJdbcStateStore; private static FalconJPAService falconJPAService = FalconJPAService.get(); protected int execDBCLICommands(String[] args) { @@ -71,12 +75,16 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { falconJPAService.init(); this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); this.conf = dfsCluster.getConf(); + monitoringJdbcStateStore = new MonitoringJdbcStateStore(); + } + + @BeforeMethod + public void init() { + clear(); } @Test public void testInsertRetrieveAndUpdate() throws Exception { - - MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore(); monitoringJdbcStateStore.putMonitoredFeed("test_feed1"); monitoringJdbcStateStore.putMonitoredFeed("test_feed2"); Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName()); @@ -94,4 +102,37 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1); monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster"); } + + @Test + public void testEmptyLatestInstance() throws Exception { + MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); + store.putMonitoredFeed("test-feed1"); + store.putMonitoredFeed("test-feed2"); + Assert.assertNull(store.getLastInstanceTime("test-feed1")); + + Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); + + store.putPendingInstances("test-feed1", "test_cluster", dateTwo); + store.putPendingInstances("test-feed1", "test_cluster", dateOne); + store.putPendingInstances("test-feed2", "test_cluster", dateOne); + + Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1"))); + Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2"))); + + } + + private void clear() { + EntityManager em = FalconJPAService.get().getEntityManager(); + em.getTransaction().begin(); + try { + Query query = em.createNativeQuery("delete from MONITORED_FEEDS"); + query.executeUpdate(); + query = em.createNativeQuery("delete from PENDING_INSTANCES"); + query.executeUpdate(); + } finally { + em.getTransaction().commit(); + em.close(); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index b739037..dbe0cf4 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -21,12 +21,12 @@ package org.apache.falcon.service; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.TimeZone; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; @@ -65,7 +65,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase { Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z"); Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z"); - BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(); + List<Date> missingInstances = new ArrayList<>(); missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between
