Repository: falcon Updated Branches: refs/heads/master 2bea7f474 -> 7354f870b
FALCON-2096 Feed instance listing doesn't give instance status of all the clusters Author: sandeep <[email protected]> Reviewers: @pallavi-rao, @PraveenAdlakha Closes #261 from sandeepSamudrala/FALCON-2096 and squashes the following commits: c9030e6 [sandeep] FALCON-2096. Fixed checkstyle issues 4c9c655 [sandeep] FALCON-2096. Incorporated review comments. Extracted frequently used constants 9f0d751 [sandeep] Falcon 2096. Feed instance listing doesn't give instance status of all the clusters 97d2a8f [sandeep] Falcon 2096. Feed instance listing doesn't give instance status of all the clusters d853c76 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2096 fa8f8ad [sandeep] FALCON-2096. Feed instance listing doesn't give instance status of all the clusters. 89def80 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2096 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7354f870 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7354f870 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7354f870 Branch: refs/heads/master Commit: 7354f870b2bff7b5f0451a792cb06d2eee00ff38 Parents: 2bea7f4 Author: sandeep <[email protected]> Authored: Wed Aug 17 09:27:31 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Aug 17 09:27:31 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/entity/FeedHelper.java | 9 +- .../falcon/entity/FileSystemStorageTest.java | 163 ++++++++++++------- 2 files changed, 111 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7354f870/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index ea34d34..757359f 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -803,12 +803,11 @@ public final class FeedHelper { Date start, Date end) throws FalconException { Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject); FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success"); + List<FeedInstanceResult.Instance> allInstances = new ArrayList<FeedInstanceResult.Instance>(); for (String cluster : clusters) { Feed feed = (Feed) entityObject; Storage storage = createStorage(cluster, feed); List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end); - FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()]; - int index = 0; for (FeedInstanceStatus feedStatus : feedListing) { FeedInstanceResult.Instance instance = new FeedInstanceResult.Instance(cluster, feedStatus.getInstance(), @@ -817,10 +816,12 @@ public final class FeedHelper { instance.uri = feedStatus.getUri(); instance.size = feedStatus.getSize(); instance.sizeH = feedStatus.getSizeH(); - instances[index++] = instance; + allInstances.add(instance); } - result.setInstances(instances); } + FeedInstanceResult.Instance[] resultInstances = allInstances.toArray( + new FeedInstanceResult.Instance[allInstances.size()]); + result.setInstances(resultInstances); return result; } http://git-wip-us.apache.org/repos/asf/falcon/blob/7354f870/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java index 30edd94..98b0f8e 100644 --- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java @@ -34,6 +34,7 @@ import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.Validity; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.FalconTestUtil; import org.apache.hadoop.fs.FileStatus; @@ -60,6 +61,8 @@ import java.util.TimeZone; public class FileSystemStorageTest { private static final String USER = FalconTestUtil.TEST_USER_1; + private static final String TEST_FEED_LISTING = "TestFeedListing"; + private static final String TEST_FEED_INSTANCE_LISTING = "TestFeedInstanceListing"; @BeforeClass public void setUp() { @@ -424,7 +427,7 @@ public class FileSystemStorageTest { @Test (dataProvider = "testListingDataProvider") public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone, Date start, Date end) throws Exception { - EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false); + EmbeddedCluster cluster = EmbeddedCluster.newCluster(TEST_FEED_LISTING, false); FileSystem fs = cluster.getFileSystem(); ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster()); try { @@ -433,79 +436,125 @@ public class FileSystemStorageTest { FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem(). getUri().toString(), feed.getLocations()); List<FeedInstanceStatus> actual = fileSystemStorage. - getListing(feed, "TestFeedListing", LocationType.DATA, start, end); + getListing(feed, TEST_FEED_LISTING, LocationType.DATA, start, end); Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match"); } finally { ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName()); } } + @Test (dataProvider = "testListingDataProvider") + public void testInstanceListing(String availabilityFlag, Frequency frequency, TimeZone timeZone, + Date start, Date end) throws Exception { + EmbeddedCluster firstCluster = EmbeddedCluster.newCluster(TEST_FEED_LISTING, false); + FileSystem fs = firstCluster.getFileSystem(); + ConfigurationStore.get().publish(EntityType.CLUSTER, firstCluster.getCluster()); + + + EmbeddedCluster secondCluster = EmbeddedCluster.newCluster(TEST_FEED_INSTANCE_LISTING, false); + ConfigurationStore.get().publish(EntityType.CLUSTER, secondCluster.getCluster()); + + try { + Feed feed = getFeed(availabilityFlag, frequency, timeZone); + Cluster cluster = new Cluster(); + cluster.setName(TEST_FEED_INSTANCE_LISTING); + feed.getClusters().getClusters().add(cluster); + Validity validity = new Validity(); + cluster.setValidity(validity); + validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000))); + validity.setEnd(new Date(System.currentTimeMillis() - (1000L * 21 * 3600000))); + Locations locations = new Locations(); + Location dataLocation = new Location(); + dataLocation.setPath("/TestFeedInstanceListing/data/${YEAR}/${MONTH}/${DAY}" + + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE"); + dataLocation.setType(LocationType.DATA); + locations.getLocations().add(dataLocation); + cluster.setLocations(locations); + + List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end); + + FeedInstanceResult actual = FeedHelper.getFeedInstanceListing(feed, start, end); + Assert.assertEquals(actual.getInstances().length, expected.size()); + } finally { + ConfigurationStore.get().remove(EntityType.CLUSTER, firstCluster.getCluster().getName()); + ConfigurationStore.get().remove(EntityType.CLUSTER, secondCluster.getCluster().getName()); + } + + + } + @SuppressWarnings("MagicConstant") private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed, Date start, Date end) throws Exception { - fs.delete(new Path("/TestFeedListing"), true); + fs.delete(new Path("/" + TEST_FEED_LISTING), true); Random random = new Random(); List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); String basePath = feed.getLocations().getLocations().get(0).getPath(); Frequency frequency = feed.getFrequency(); TimeZone tz = feed.getTimezone(); - Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(), - feed.getFrequency(), tz, new Date(start.getTime())); - Date dataEnd = new Date(end.getTime()); - while (dataStart.before(dataEnd)) { - Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz); - String path = ExpressionHelper.substitute(basePath, properties); - FeedInstanceStatus instance = new FeedInstanceStatus(path); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING); - instance.setSize(-1); - instance.setCreationTime(0); - Date date = FeedHelper.getDate(basePath, new Path(path), tz); - instance.setInstance(SchemaHelper.formatDateUTC(date)); - Calendar cal = Calendar.getInstance(); - cal.setTime(dataStart); - cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt()); - dataStart.setTime(cal.getTimeInMillis()); - if (random.nextBoolean()) { - OutputStream out = fs.create(new Path(path, "file")); - out.write("Hello World\n".getBytes()); - out.close(); - instance.setSize(12); - if (feed.getAvailabilityFlag() == null - || (feed.getAvailabilityFlag() != null && random.nextBoolean())) { - //If availability is not present or if ok to create availability file, mark as available - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE); - if (feed.getAvailabilityFlag() != null) { + for (Cluster cluster : feed.getClusters().getClusters()) { + Date dataStart = EntityUtil.getNextStartTime(cluster.getValidity().getStart(), + feed.getFrequency(), tz, new Date(start.getTime())); + String clusterLocationPath = null; + if (cluster.getLocations() != null && cluster.getLocations().getLocations().get(0).getPath() != null) { + basePath = clusterLocationPath == null ? basePath : clusterLocationPath; + } + Date dataEnd = new Date(end.getTime()); + while (dataStart.before(dataEnd)) { + Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz); + String path = ExpressionHelper.substitute(basePath, properties); + FeedInstanceStatus instance = new FeedInstanceStatus(path); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING); + instance.setSize(-1); + instance.setCreationTime(0); + Date date = FeedHelper.getDate(basePath, new Path(path), tz); + instance.setInstance(SchemaHelper.formatDateUTC(date)); + Calendar cal = Calendar.getInstance(); + cal.setTime(dataStart); + cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt()); + dataStart.setTime(cal.getTimeInMillis()); + if (random.nextBoolean()) { + OutputStream out = fs.create(new Path(path, "file")); + out.write("Hello World\n".getBytes()); + out.close(); + instance.setSize(12); + if (feed.getAvailabilityFlag() == null + || (feed.getAvailabilityFlag() != null && random.nextBoolean())) { + //If availability is not present or if ok to create availability file, mark as available + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE); + if (feed.getAvailabilityFlag() != null) { + fs.create(new Path(path, feed.getAvailabilityFlag())).close(); + } + } else if (feed.getAvailabilityFlag() != null) { + //If availability is present or not ok to create availability file, mark as partial + fs.mkdirs(new Path(path)); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); + } + } else { + if (feed.getAvailabilityFlag() == null && random.nextBoolean()) { + //If availability is not present or ok to create dir, mark as empty + fs.mkdirs(new Path(path)); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); + instance.setSize(0); + } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) { + //If availability is present and ok to create dir, mark as partial + fs.mkdirs(new Path(path)); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); + } else if (feed.getAvailabilityFlag() != null) { + //If availability is present and ok to create empty instance fs.create(new Path(path, feed.getAvailabilityFlag())).close(); + instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); + instance.setSize(0); } - } else if (feed.getAvailabilityFlag() != null) { - //If availability is present or not ok to create availability file, mark as partial - fs.mkdirs(new Path(path)); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); } - } else { - if (feed.getAvailabilityFlag() == null && random.nextBoolean()) { - //If availability is not present or ok to create dir, mark as empty - fs.mkdirs(new Path(path)); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); - instance.setSize(0); - } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) { - //If availability is present and ok to create dir, mark as partial - fs.mkdirs(new Path(path)); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); - } else if (feed.getAvailabilityFlag() != null) { - //If availability is present and ok to create empty instance - fs.create(new Path(path, feed.getAvailabilityFlag())).close(); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); - instance.setSize(0); + try { + FileStatus fileStatus = fs.getFileStatus(new Path(path)); + instance.setCreationTime(fileStatus.getModificationTime()); + } catch (IOException e) { + //ignore } + instances.add(instance); } - try { - FileStatus fileStatus = fs.getFileStatus(new Path(path)); - instance.setCreationTime(fileStatus.getModificationTime()); - } catch (IOException e) { - //ignore - } - instances.add(instance); } return instances; } @@ -518,12 +567,12 @@ public class FileSystemStorageTest { feed.setLocations(new Locations()); Location dataLocation = new Location(); feed.getLocations().getLocations().add(dataLocation); - dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}" + dataLocation.setPath("/" + TEST_FEED_LISTING + "/data/${YEAR}/${MONTH}/${DAY}" + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE"); dataLocation.setType(LocationType.DATA); feed.setClusters(new Clusters()); Cluster cluster = new Cluster(); - cluster.setName("TestFeedListing"); + cluster.setName(TEST_FEED_LISTING); feed.getClusters().getClusters().add(cluster); Validity validity = new Validity(); cluster.setValidity(validity);
