Repository: falcon Updated Branches: refs/heads/master 33d72f77a -> 8c52a9add
FALCON-763 Support feed listing for CatalogStorage. Contributed by Balu Vellanki Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8c52a9ad Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8c52a9ad Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8c52a9ad Branch: refs/heads/master Commit: 8c52a9add412cf91143d5fd4c4285f66d88a04d6 Parents: 33d72f7 Author: bvellanki <[email protected]> Authored: Thu Jan 28 11:48:30 2016 -0800 Committer: bvellanki <[email protected]> Committed: Thu Jan 28 11:48:30 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/falcon/catalog/CatalogPartition.java | 20 ++- .../falcon/catalog/HiveCatalogService.java | 12 ++ .../apache/falcon/entity/CatalogStorage.java | 73 +++++++- .../org/apache/falcon/entity/FeedHelper.java | 18 ++ .../apache/falcon/entity/FileSystemStorage.java | 6 +- .../apache/falcon/entity/FeedHelperTest.java | 12 ++ .../apache/falcon/catalog/CatalogStorageIT.java | 170 +++++++++++++++++++ 8 files changed, 304 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b92d88d..d904624 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,7 +9,10 @@ Trunk FALCON-1495 In instance status list, show all runs for instances when requested by user(Narayan Periwal via Ajay Yadava) FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava) + IMPROVEMENTS + FALCON-763 Support feed listing for CatalogStorage (Balu Vellanki) + FALCON-1764 Remove temporary folder "localhost" created during tests(Praveen Adlakha via Ajay Yadava) FALCON-1756 Remove PID files on service stop(Deepak Barr via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java index 9e35782..71194c7 100644 --- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java +++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java @@ -34,6 +34,7 @@ public class CatalogPartition { private String outputFormat; private String location; private String serdeInfo; + private long size = -1; protected CatalogPartition() { } @@ -74,6 +75,8 @@ public class CatalogPartition { this.serdeInfo = serdeInfo; } + public void setSize(long size) { this.size = size; } + /** * Gets the database name. * @@ -156,14 +159,21 @@ public class CatalogPartition { return this.values; } + /** + * Gets the size. + * + * @return the size + */ + public long getSize() { return size; } + @Override public String toString() { return "CatalogPartition [" - + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null") - + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null") - + (values != null ? "values=" + values + ", " : "values=null") - + "createTime=" + createTime + ", lastAccessTime=" - + lastAccessTime + ", " + "]"; + + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null, ") + + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null, ") + + (values != null ? "values=" + values + ", " : "values=null, ") + + "size=" + size + ", " + "createTime=" + createTime + ", lastAccessTime=" + + lastAccessTime + "]"; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java index b988c3e..872f91f 100644 --- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java +++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java @@ -18,6 +18,7 @@ package org.apache.falcon.catalog; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; @@ -47,6 +48,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; /** * An implementation of CatalogService that uses Hive Meta Store (HCatalog) @@ -57,6 +59,7 @@ public class HiveCatalogService extends AbstractCatalogService { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class); public static final String CREATE_TIME = "falcon.create_time"; public static final String UPDATE_TIME = "falcon.update_time"; + public static final String PARTITION_DOES_NOT_EXIST = "Partition does not exist"; public static HiveConf createHiveConf(Configuration conf, @@ -291,6 +294,13 @@ public class HiveCatalogService extends AbstractCatalogService { catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib()); catalogPartition.setCreateTime(hCatPartition.getCreateTime()); catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime()); + Map<String, String> params = hCatPartition.getParameters(); + if (params != null) { + String size = hCatPartition.getParameters().get("totalSize"); + if (StringUtils.isNotBlank(size)) { + catalogPartition.setSize(Long.parseLong(size)); + } + } return catalogPartition; } @@ -337,6 +347,8 @@ public class HiveCatalogService extends AbstractCatalogService { HiveMetaStoreClient client = createClient(conf, catalogUrl); Partition hCatPartition = client.getPartition(database, tableName, partitionValues); return createCatalogPartition(hCatPartition); + } catch (NoSuchObjectException nsoe) { + throw new FalconException(PARTITION_DOES_NOT_EXIST + ":" + nsoe.getMessage(), nsoe); } catch (Exception e) { throw new FalconException("Exception fetching partition:" + e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java index 143d9b4..c5860c9 100644 --- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java @@ -23,6 +23,7 @@ import org.apache.falcon.Pair; import org.apache.falcon.catalog.AbstractCatalogService; import org.apache.falcon.catalog.CatalogPartition; import org.apache.falcon.catalog.CatalogServiceFactory; +import org.apache.falcon.catalog.HiveCatalogService; import org.apache.falcon.entity.common.FeedDataPath; import org.apache.falcon.entity.v0.AccessControlList; import org.apache.falcon.entity.v0.cluster.Cluster; @@ -381,15 +382,79 @@ public class CatalogStorage extends Configured implements Storage { } @Override - public List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType, + public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType, Date start, Date end) throws FalconException { - throw new UnsupportedOperationException("getListing"); + try { + List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); + Date feedStart = FeedHelper.getFeedValidityStart(feed, clusterName); + Date alignedDate = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), + feed.getTimezone(), start); + + while (!end.before(alignedDate)) { + List<String> partitionValues = getCatalogPartitionValues(alignedDate); + try { + CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition( + getConf(), getCatalogUrl(), getDatabase(), getTable(), partitionValues); + instances.add(getFeedInstanceFromCatalogPartition(partition)); + } catch (FalconException e) { + if (e.getMessage().startsWith(HiveCatalogService.PARTITION_DOES_NOT_EXIST)) { + // Partition missing + FeedInstanceStatus instanceStatus = new FeedInstanceStatus(null); + instanceStatus.setInstance(partitionValues.toString()); + instances.add(instanceStatus); + } else { + throw e; + } + } + alignedDate = FeedHelper.getNextFeedInstanceDate(alignedDate, feed); + } + return instances; + } catch (Exception e) { + LOG.error("Unable to retrieve listing for {}:{} -- {}", locationType, catalogUrl, e.getMessage()); + throw new FalconException("Unable to retrieve listing for (URI " + catalogUrl + ")", e); + } + } + + private List<String> getCatalogPartitionValues(Date alignedDate) throws FalconException { + List<String> partitionValues = new ArrayList<String>(); + for (Map.Entry<String, String> entry : getPartitions().entrySet()) { + if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) { + ExpressionHelper.setReferenceDate(alignedDate); + ExpressionHelper expressionHelper = ExpressionHelper.get(); + String instanceValue = expressionHelper.evaluateFullExpression(entry.getValue(), String.class); + partitionValues.add(instanceValue); + } else { + partitionValues.add(entry.getValue()); + } + } + return partitionValues; + } + + private FeedInstanceStatus getFeedInstanceFromCatalogPartition(CatalogPartition partition) { + FeedInstanceStatus feedInstanceStatus = new FeedInstanceStatus(partition.getLocation()); + feedInstanceStatus.setCreationTime(partition.getCreateTime()); + feedInstanceStatus.setInstance(partition.getValues().toString()); + FeedInstanceStatus.AvailabilityStatus availabilityStatus = FeedInstanceStatus.AvailabilityStatus.MISSING; + long size = partition.getSize(); + if (size == 0) { + availabilityStatus = FeedInstanceStatus.AvailabilityStatus.EMPTY; + } else if (size > 0) { + availabilityStatus = FeedInstanceStatus.AvailabilityStatus.AVAILABLE; + } + feedInstanceStatus.setSize(size); + feedInstanceStatus.setStatus(availabilityStatus); + return feedInstanceStatus; } @Override public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, - LocationType locationType, Date instancetime) throws FalconException { - throw new UnsupportedOperationException("getInstanceAvailabilityStatus"); //TODO to be implemented later + LocationType locationType, Date instanceTime) throws FalconException { + List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime); + if (result.isEmpty()) { + return FeedInstanceStatus.AvailabilityStatus.MISSING; + } else { + return result.get(0).getStatus(); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 8aa97ec..b3aaaab 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -408,6 +408,24 @@ public final class FeedHelper { return null; } + public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException { + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName); + if (feedCluster != null) { + return feedCluster.getValidity().getStart(); + } else { + throw new FalconException("No matching cluster " + clusterName + + "found for feed " + feed.getName()); + } + } + + public static Date getNextFeedInstanceDate(Date alignedDate, Feed feed) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(alignedDate); + calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(), + feed.getFrequency().getFrequencyAsInt()); + return calendar.getTime(); + } + /** * Returns various policies applicable for a feed. * http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java index 200f71f..ece8b5d 100644 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java @@ -466,7 +466,11 @@ public class FileSystemStorage extends Configured implements Storage { Date instanceTime) throws FalconException { List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime); - return result.get(0).getStatus(); + if (result.isEmpty()) { + return FeedInstanceStatus.AvailabilityStatus.MISSING; + } else { + return result.get(0).getStatus(); + } } public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index d565f94..95d10c4 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -53,6 +53,7 @@ import org.apache.falcon.entity.v0.process.Outputs; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.service.LifecyclePolicyMap; +import org.apache.falcon.util.DateUtil; import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -198,6 +199,17 @@ public class FeedHelperTest extends AbstractTestBase { } @Test + public void testGetFeedValidityStartAndNextInstance() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); + Date date = FeedHelper.getFeedValidityStart(feed, cluster.getName()); + Assert.assertEquals(DateUtil.getDateFormatFromTime(date.getTime()), "2011-02-28T10:00Z"); + Date nextDate = FeedHelper.getNextFeedInstanceDate(date, feed); + Assert.assertEquals(DateUtil.getDateFormatFromTime(nextDate.getTime()), "2011-02-28T10:05Z"); + } + + + @Test public void testGetConsumersFirstInstance() throws Exception { Cluster cluster = publishCluster(); Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); http://git-wip-us.apache.org/repos/asf/falcon/blob/8c52a9ad/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java b/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java new file mode 100644 index 0000000..bf3b2ec --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/catalog/CatalogStorageIT.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.catalog; + +import org.apache.falcon.entity.CatalogStorage; +import org.apache.falcon.entity.FeedInstanceStatus; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.cluster.Interface; +import org.apache.falcon.entity.v0.cluster.Interfaces; +import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Validity; +import org.apache.falcon.entity.v0.feed.CatalogTable; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.resource.TestContext; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.HiveTestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.api.HCatAddPartitionDesc; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.common.HCatException; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.TimeZone; + + +/** + * Tests Hive Meta Store service. + */ +public class CatalogStorageIT { + + private static final String METASTORE_URL = "thrift://localhost:49083"; + private static final String DATABASE_NAME = "CatalogStorageITDB"; + private static final String TABLE_NAME = "CatalogStorageITTable"; + + private HCatClient client; + private Feed feed = new Feed(); + private org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster(); + private DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z"); + private CatalogStorage storage; + + @BeforeClass + public void setUp() throws Exception { + // setup a logged in user + CurrentUser.authenticate(TestContext.REMOTE_USER); + client = TestContext.getHCatClient(METASTORE_URL); + + HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME); + List<String> partitionKeys = new ArrayList<String>(); + partitionKeys.add("ds"); + partitionKeys.add("region"); + HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys); + addPartitions(); + addClusterAndFeed(); + } + + private void addClusterAndFeed() throws Exception { + cluster.setName("testCluster"); + Interfaces interfaces = new Interfaces(); + Interface registry = new Interface(); + registry.setType(Interfacetype.REGISTRY); + registry.setEndpoint(METASTORE_URL); + interfaces.getInterfaces().add(registry); + cluster.setInterfaces(interfaces); + + feed.setName("feed"); + Frequency f = new Frequency("days(1)"); + feed.setFrequency(f); + feed.setTimezone(TimeZone.getTimeZone("UTC")); + Clusters fClusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); + fCluster.setType(ClusterType.SOURCE); + fCluster.setName("testCluster"); + Validity validity = new Validity(); + validity.setStart(format.parse("2013-09-01 00:00 UTC")); + validity.setEnd(format.parse("2013-09-06 00:00 UTC")); + fCluster.setValidity(validity); + fClusters.getClusters().add(fCluster); + feed.setClusters(fClusters); + + initCatalogService(); + } + + private void initCatalogService() throws Exception { + CatalogTable table = new CatalogTable(); + String uri = "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}${MONTH}${DAY};region=us"; + table.setUri(uri); + feed.setTable(table); + + storage = new CatalogStorage(cluster, table); + Configuration configuration = HiveCatalogService.createHiveConf(new Configuration(), storage.getCatalogUrl()); + storage.setConf(configuration); + } + + + @AfterClass + public void tearDown() throws Exception { + dropTable(TABLE_NAME); + dropDatabase(); + TestContext.deleteEntitiesFromStore(); + } + + private void dropTable(String tableName) throws Exception { + client.dropTable(DATABASE_NAME, tableName, true); + } + + private void dropDatabase() throws Exception { + client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE); + } + + private void addPartitions() throws Exception { + putPartition("20130901", "us"); + putPartition("20130902", "us"); + putPartition("20130904", "us"); + putPartition("20130905", "us"); + } + + private void putPartition(String date, String region) throws HCatException { + Map<String, String> partition = new HashMap<String, String>(); + partition.put("ds", date); //yyyyMMDD + partition.put("region", region); + HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create( + DATABASE_NAME, TABLE_NAME, null, partition).build(); + client.addPartition(addPtn); + } + + @Test + public void testGetInstanceAvailabilityStatus() throws Exception { + List<FeedInstanceStatus> instanceStatuses = storage.getListing(feed, cluster.getName(), + LocationType.DATA, format.parse("2013-09-02 00:00 UTC"), format.parse("2013-09-04 00:00 UTC")); + Assert.assertEquals(instanceStatuses.size(), 3); + } + + @Test + public void testGetListing() throws Exception { + FeedInstanceStatus.AvailabilityStatus availabilityStatus = storage.getInstanceAvailabilityStatus( + feed, cluster.getName(), + LocationType.DATA, format.parse("2013-09-03 00:00 UTC")); + Assert.assertEquals(availabilityStatus, FeedInstanceStatus.AvailabilityStatus.MISSING); + } + +}
