http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java deleted file mode 100644 index cca2d8b..0000000 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ /dev/null @@ -1,1292 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.LifeCycle; -import org.apache.falcon.Tag; -import org.apache.falcon.entity.common.FeedDataPath; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.datasource.DatasourceType; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; -import org.apache.falcon.entity.v0.feed.Lifecycle; -import org.apache.falcon.entity.v0.feed.Load; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.entity.v0.feed.MergeType; -import org.apache.falcon.entity.v0.feed.Property; -import org.apache.falcon.entity.v0.feed.RetentionStage; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.entity.v0.feed.Validity; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.lifecycle.FeedLifecycleStage; -import org.apache.falcon.resource.APIResult; -import org.apache.falcon.resource.EntityList; -import org.apache.falcon.resource.FeedInstanceResult; -import org.apache.falcon.resource.SchedulableEntityInstance; -import org.apache.falcon.util.BuildProperties; -import org.apache.falcon.util.DateUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TimeZone; -import java.util.regex.Matcher; - -/** - * Feed entity helper methods. - */ -public final class FeedHelper { - - private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class); - private static final int ONE_MS = 1; - - public static final String FORMAT = "yyyyMMddHHmm"; - - private FeedHelper() {} - - public static Cluster getCluster(Feed feed, String clusterName) { - for (Cluster cluster : feed.getClusters().getClusters()) { - if (cluster.getName().equals(clusterName)) { - return cluster; - } - } - return null; - } - - public static Storage createStorage(Feed feed) throws FalconException { - - final Locations feedLocations = feed.getLocations(); - if (feedLocations != null - && feedLocations.getLocations().size() != 0) { - return new FileSystemStorage(feed); - } - - try { - final CatalogTable table = feed.getTable(); - if (table != null) { - return new CatalogStorage(feed); - } - } catch (URISyntaxException e) { - throw new FalconException(e); - } - - throw new FalconException("Both catalog and locations are not defined."); - } - - public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed) throws FalconException { - return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity); - } - - public static Storage createStorage(String clusterName, Feed feed) - throws FalconException { - - return createStorage(getCluster(feed, clusterName), feed); - } - - public static Storage createStorage(Cluster cluster, Feed feed) - throws FalconException { - - final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = - EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); - - return createStorage(cluster, feed, clusterEntity); - } - - public static Storage createStorage(Cluster cluster, Feed feed, - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity) - throws FalconException { - - final List<Location> locations = getLocations(cluster, feed); - if (locations != null) { - return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations); - } - - try { - final CatalogTable table = getTable(cluster, feed); - if (table != null) { - return new CatalogStorage(clusterEntity, table); - } - } catch (URISyntaxException e) { - throw new FalconException(e); - } - - throw new FalconException("Both catalog and locations are not defined."); - } - - /** - * Factory method to dole out a storage instance used for replication source. - * - * @param clusterEntity cluster entity - * @param feed feed entity - * @return an implementation of Storage - * @throws FalconException - */ - public static Storage createReadOnlyStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed) throws FalconException { - Cluster feedCluster = getCluster(feed, clusterEntity.getName()); - final List<Location> locations = getLocations(feedCluster, feed); - if (locations != null) { - return new FileSystemStorage(ClusterHelper.getReadOnlyStorageUrl(clusterEntity), locations); - } - - try { - final CatalogTable table = getTable(feedCluster, feed); - if (table != null) { - return new CatalogStorage(clusterEntity, table); - } - } catch (URISyntaxException e) { - throw new FalconException(e); - } - - throw new FalconException("Both catalog and locations are not defined."); - } - - public static Storage createStorage(String type, String storageUriTemplate) - throws URISyntaxException { - - Storage.TYPE storageType = Storage.TYPE.valueOf(type); - if (storageType == Storage.TYPE.FILESYSTEM) { - return new FileSystemStorage(storageUriTemplate); - } else if (storageType == Storage.TYPE.TABLE) { - return new CatalogStorage(storageUriTemplate); - } - - throw new IllegalArgumentException("Bad type: " + type); - } - - public static Storage createStorage(String type, String storageUriTemplate, - Configuration conf) throws URISyntaxException { - Storage.TYPE storageType = Storage.TYPE.valueOf(type); - if (storageType == Storage.TYPE.FILESYSTEM) { - return new FileSystemStorage(storageUriTemplate); - } else if (storageType == Storage.TYPE.TABLE) { - return new CatalogStorage(storageUriTemplate, conf); - } - - throw new IllegalArgumentException("Bad type: " + type); - } - - public static Storage.TYPE getStorageType(Feed feed) throws FalconException { - final Locations feedLocations = feed.getLocations(); - if (feedLocations != null - && feedLocations.getLocations().size() != 0) { - return Storage.TYPE.FILESYSTEM; - } - - final CatalogTable table = feed.getTable(); - if (table != null) { - return Storage.TYPE.TABLE; - } - - throw new FalconException("Both catalog and locations are not defined."); - } - - public static Storage.TYPE getStorageType(Feed feed, - Cluster cluster) throws FalconException { - final List<Location> locations = getLocations(cluster, feed); - if (locations != null) { - return Storage.TYPE.FILESYSTEM; - } - - final CatalogTable table = getTable(cluster, feed); - if (table != null) { - return Storage.TYPE.TABLE; - } - - throw new FalconException("Both catalog and locations are not defined."); - } - - public static Storage.TYPE getStorageType(Feed feed, - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity) - throws FalconException { - Cluster feedCluster = getCluster(feed, clusterEntity.getName()); - return getStorageType(feed, feedCluster); - } - - public static List<Location> getLocations(Cluster cluster, Feed feed) { - // check if locations are overridden in cluster - final Locations clusterLocations = cluster.getLocations(); - if (clusterLocations != null - && clusterLocations.getLocations().size() != 0) { - return clusterLocations.getLocations(); - } - - Locations feedLocations = feed.getLocations(); - return feedLocations == null ? null : feedLocations.getLocations(); - } - - public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster, - LocationType type) { - List<Location> locations = getLocations(getCluster(feed, cluster.getName()), feed); - if (locations != null) { - for (Location location : locations) { - if (location.getType() == type) { - return location; - } - } - } - - return null; - } - - public static Sla getSLA(Cluster cluster, Feed feed) { - final Sla clusterSla = cluster.getSla(); - if (clusterSla != null) { - return clusterSla; - } - final Sla feedSla = feed.getSla(); - return feedSla == null ? null : feedSla; - } - - public static Sla getSLA(String clusterName, Feed feed) { - Cluster cluster = FeedHelper.getCluster(feed, clusterName); - return cluster != null ? getSLA(cluster, feed) : null; - } - - protected static CatalogTable getTable(Cluster cluster, Feed feed) { - // check if table is overridden in cluster - if (cluster.getTable() != null) { - return cluster.getTable(); - } - - return feed.getTable(); - } - - public static String normalizePartitionExpression(String part1, String part2) { - String partExp = StringUtils.stripToEmpty(part1) + "/" + StringUtils.stripToEmpty(part2); - partExp = partExp.replaceAll("//+", "/"); - partExp = StringUtils.stripStart(partExp, "/"); - partExp = StringUtils.stripEnd(partExp, "/"); - return partExp; - } - - public static String normalizePartitionExpression(String partition) { - return normalizePartitionExpression(partition, null); - } - - public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) { - Properties properties = new Properties(); - Map<String, String> clusterVars = new HashMap<>(); - clusterVars.put("colo", cluster.getColo()); - clusterVars.put("name", cluster.getName()); - if (cluster.getProperties() != null) { - for (org.apache.falcon.entity.v0.cluster.Property property : cluster.getProperties().getProperties()) { - clusterVars.put(property.getName(), property.getValue()); - } - } - properties.put("cluster", clusterVars); - return properties; - } - - public static String evaluateClusterExp(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, String exp) - throws FalconException { - - Properties properties = getClusterProperties(clusterEntity); - ExpressionHelper expHelp = ExpressionHelper.get(); - expHelp.setPropertiesForVariable(properties); - return expHelp.evaluateFullExpression(exp, String.class); - } - - public static String getStagingPath(boolean isSource, - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed, CatalogStorage storage, Tag tag, String suffix) { - String stagingDirPath = getStagingDir(isSource, clusterEntity, feed, storage, tag); - - String datedPartitionKey = storage.getDatedPartitionKeys().get(0); - String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output'," - + "'" + datedPartitionKey + "')}"; - return stagingDirPath + "/" - + datedPartitionKeySuffix + "/" - + suffix + "/" - + "data"; - } - - public static String getStagingDir(boolean isSource, - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed, CatalogStorage storage, Tag tag) { - String workflowName = EntityUtil.getWorkflowName( - tag, Arrays.asList(clusterEntity.getName()), feed).toString(); - - // log path is created at scheduling wf - final String storageUri = isSource - ? ClusterHelper.getReadOnlyStorageUrl(clusterEntity) // read interface - : ClusterHelper.getStorageUrl(clusterEntity); // write interface - return storageUri - + EntityUtil.getLogPath(clusterEntity, feed) + "/" - + workflowName + "/" - + storage.getDatabase() + "/" - + storage.getTable(); - } - - public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) { - Properties props = new Properties(); - props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy"); - props.put("userWorkflowEngine", "falcon"); - - String version; - try { - version = BuildProperties.get().getProperty("build.version"); - } catch (Exception e) { // unfortunate that this is only available in prism/webapp - version = "0.6"; - } - props.put("userWorkflowVersion", version); - return props; - } - - public static Properties getFeedProperties(Feed feed) { - Properties feedProperties = new Properties(); - if (feed.getProperties() != null) { - for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) { - feedProperties.put(property.getName(), property.getValue()); - } - } - return feedProperties; - } - - public static Lifecycle getLifecycle(Feed feed, String clusterName) throws FalconException { - Cluster cluster = getCluster(feed, clusterName); - if (cluster !=null) { - return cluster.getLifecycle() != null ? cluster.getLifecycle() : feed.getLifecycle(); - } - throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName()); - } - - public static RetentionStage getRetentionStage(Feed feed, String clusterName) throws FalconException { - if (isLifecycleEnabled(feed, clusterName)) { - Lifecycle globalLifecycle = feed.getLifecycle(); - Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle(); - - if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) { - return clusterLifecycle.getRetentionStage(); - } else if (globalLifecycle != null) { - return globalLifecycle.getRetentionStage(); - } - } - return null; - } - - public static Date getFeedValidityStart(Feed feed, String clusterName) throws FalconException { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName); - if (feedCluster != null) { - return feedCluster.getValidity().getStart(); - } else { - throw new FalconException("No matching cluster " + clusterName - + "found for feed " + feed.getName()); - } - } - - public static Date getNextFeedInstanceDate(Date alignedDate, Feed feed) { - Calendar calendar = Calendar.getInstance(); - calendar.setTime(alignedDate); - calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(), - feed.getFrequency().getFrequencyAsInt()); - return calendar.getTime(); - } - - /** - * Returns various policies applicable for a feed. - * - * @param feed - * @return list of names of lifecycle policies for the given feed, empty list if there are none. - */ - public static List<String> getPolicies(Feed feed, String clusterName) throws FalconException { - List<String> result = new ArrayList<>(); - Cluster cluster = getCluster(feed, clusterName); - if (cluster != null) { - if (isLifecycleEnabled(feed, clusterName)) { - String policy = getRetentionStage(feed, clusterName).getPolicy(); - policy = StringUtils.isBlank(policy) - ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy; - result.add(policy); - } - return result; - } - throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName()); - } - - /** - * Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z. - * @param instancePath - actual data path - * @param templatePath - template path from feed definition - * @param timeZone timeZone - * @return date corresponding to the path - */ - //consider just the first occurrence of the pattern - public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) { - String path = instancePath.toString(); - Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath); - Calendar cal = Calendar.getInstance(timeZone); - int lastEnd = 0; - - Set<FeedDataPath.VARS> matchedVars = new HashSet<>(); - while (matcher.find()) { - FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group()); - String pad = templatePath.substring(lastEnd, matcher.start()); - if (!path.startsWith(pad)) { - //Template and path do not match - return null; - } - - int value; - try { - value = Integer.parseInt(path.substring(pad.length(), pad.length() + pathVar.getValueSize())); - } catch (NumberFormatException e) { - //Not a valid number for variable - return null; - } - - pathVar.setCalendar(cal, value); - lastEnd = matcher.end(); - path = path.substring(pad.length() + pathVar.getValueSize()); - matchedVars.add(pathVar); - } - - String remTemplatePath = templatePath.substring(lastEnd); - //Match the remaining constant at the end - //Handling case where feed instancePath has partitions - if (StringUtils.isNotEmpty(path) && StringUtils.isNotEmpty(remTemplatePath) - && !path.contains(remTemplatePath)) { - return null; - } - - - //Reset other fields - for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) { - if (!matchedVars.contains(var)) { - switch (var.getCalendarField()) { - case Calendar.DAY_OF_MONTH: - cal.set(var.getCalendarField(), 1); - break; - default: - cal.set(var.getCalendarField(), 0); - } - } - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - } - return cal.getTime(); - } - - public static Path getFeedBasePath(String feedPath) throws IOException { - Matcher matcher = FeedDataPath.PATTERN.matcher(feedPath); - if (matcher.find()) { - return new Path(feedPath.substring(0, matcher.start())); - } else { - throw new IOException("Unable to resolve pattern for feedPath: " + feedPath); - } - - } - - private static void validateFeedInstance(Feed feed, Date instanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster) { - - // validate the cluster - Cluster feedCluster = getCluster(feed, cluster.getName()); - if (feedCluster == null) { - throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:" - + feed.getName()); - } - - // validate that instanceTime is in validity range - if (feedCluster.getValidity().getStart().after(instanceTime) - || !feedCluster.getValidity().getEnd().after(instanceTime)) { - throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for" - + " Feed: " + feed.getName() + " on cluster:" + cluster.getName()); - } - - // validate instanceTime on basis of startTime and frequency - Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(), - feed.getTimezone(), instanceTime); - if (!nextInstance.equals(instanceTime)) { - throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the " - + " feed: " + feed.getName() + " on cluster: " + cluster.getName() - + " on the basis of startDate and frequency"); - } - } - - /** - * Given a feed Instance finds the generating process instance. - * - * [process, cluster, instanceTime] - * - * If the feed is replicated, then it returns null. - * - * @param feed output feed - * @param feedInstanceTime instance time of the feed - * @return returns the instance of the process which produces the given feed - */ - public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { - - //validate the inputs - validateFeedInstance(feed, feedInstanceTime, cluster); - Process process = getProducerProcess(feed); - if (process != null) { - org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, - cluster.getName()); - Date pStart = processCluster.getValidity().getStart(); - Date pEnd = processCluster.getValidity().getEnd(); - Frequency pFrequency = process.getFrequency(); - TimeZone pTz = process.getTimezone(); - - try { - Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster); - boolean isValid = EntityUtil.isValidInstanceTime(pStart, pFrequency, pTz, processInstanceTime); - if (processInstanceTime.before(pStart) || !processInstanceTime.before(pEnd) || !isValid){ - return null; - } - - SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(), - processInstanceTime, EntityType.PROCESS); - producer.setTags(SchedulableEntityInstance.OUTPUT); - return producer; - } catch (FalconException | IllegalArgumentException e) { - LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } " - + " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName()); - } - } - return null; - } - - /** - * Given a feed find it's generating process. - * - * If no generating process is found it returns null. - * @param feed output feed - * @return Process which produces the given feed. - */ - public static Process getProducerProcess(Feed feed) throws FalconException { - - EntityList dependencies = EntityUtil.getEntityDependencies(feed); - - for (EntityList.EntityElement e : dependencies.getElements()) { - if (e.tag.contains(EntityList.OUTPUT_TAG)) { - return EntityUtil.getEntity(EntityType.PROCESS, e.name); - } - } - return null; - } - - /** - * Find the producerInstance which will generate the given feedInstance. - * - * @param feed output feed - * @param feedInstanceTime instance time of the output feed - * @param producer producer process - * @return time of the producer instance which will produce the given feed instance. - */ - private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer, - org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { - - String clusterName = cluster.getName(); - Cluster feedCluster = getCluster(feed, clusterName); - org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(producer, clusterName); - Date producerStartDate = processCluster.getValidity().getStart(); - - // read the process definition and find the relative time difference between process and output feed - // if output process instance time is now then output FeedInstance time is x - String outputInstance = null; - for (Output op : producer.getOutputs().getOutputs()) { - if (StringUtils.equals(feed.getName(), op.getFeed())) { - outputInstance = op.getInstance(); - } - } - - ExpressionHelper.setReferenceDate(producerStartDate); - ExpressionHelper evaluator = ExpressionHelper.get(); - // producerInstance = feedInstanceTime + (difference between producer process and feed) - // the feedInstance before or equal to this time is the required one - Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class); - Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(), - feed.getFrequency(), feed.getTimezone(), relativeFeedInstance); - Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime() - - feedInstanceActual.getTime()); - Date producerInstance = new Date(producerInstanceTime); - - //validate that the producerInstance is in the validity range on the provided cluster - if (producerInstance.before(processCluster.getValidity().getStart()) - || producerInstance.after(processCluster.getValidity().getEnd())) { - throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime - + " for feed " + feed.getName() - + " is outside the range of instances produced by the producer process: " + producer.getName() - + " in it's validity range on provided cluster: " + cluster.getName()); - } - return producerInstance; - } - - - public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { - - Set<SchedulableEntityInstance> result = new HashSet<>(); - // validate that the feed has this cluster & validate that the instanceTime is a valid instanceTime - validateFeedInstance(feed, feedInstanceTime, cluster); - - Set<Process> consumers = getConsumerProcesses(feed); - for (Process p : consumers) { - Set<Date> consumerInstanceTimes = getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster); - for (Date date : consumerInstanceTimes) { - SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date, - EntityType.PROCESS); - in.setTags(SchedulableEntityInstance.INPUT); - result.add(in); - } - } - return result; - } - - - /** - * Returns the consumer processes for a given feed if any, null otherwise. - * - * @param feed input feed - * @return the set of processes which use the given feed as input, empty set if no consumers. - */ - public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException { - Set<Process> result = new HashSet<>(); - EntityList dependencies = EntityUtil.getEntityDependencies(feed); - - for (EntityList.EntityElement e : dependencies.getElements()) { - if (e.tag.contains(EntityList.INPUT_TAG)) { - Process consumer = EntityUtil.getEntity(EntityType.PROCESS, e.name); - result.add(consumer); - } - } - return result; - } - - // return all instances of a process which will consume the given feed instance - private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer, - org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { - - Set<Date> result = new HashSet<>(); - // find relevant cluster for the process - org.apache.falcon.entity.v0.process.Cluster processCluster = - ProcessHelper.getCluster(consumer, cluster.getName()); - if (processCluster == null) { - throw new IllegalArgumentException("Cluster is not valid for process"); - } - Date processStartDate = processCluster.getValidity().getStart(); - Cluster feedCluster = getCluster(feed, cluster.getName()); - Date feedStartDate = feedCluster.getValidity().getStart(); - - // find all corresponding Inputs as a process may refer same feed multiple times - List<Input> inputFeeds = new ArrayList<>(); - if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) { - for (Input input : consumer.getInputs().getInputs()) { - if (StringUtils.equals(input.getFeed(), feed.getName())) { - inputFeeds.add(input); - } - } - } - - // for each input corresponding to given feed, find corresponding consumer instances - for (Input in : inputFeeds) { - /* Algorithm for finding a consumer instance for an input feed instance - Step 1. Find one instance which will consume the given feed instance. - a. take process start date and find last input feed instance time. In this step take care of - frequencies being out of sync. - b. using the above find the time difference between the process instance and feed instance. - c. Adding the above time difference to given feed instance for which we want to find the consumer - instances we will get one consumer process instance. - Step 2. Keep checking for next instances of process till they consume the given feed Instance. - Step 3. Similarly check for all previous instances of process till they consume the given feed instance. - */ - - // Step 1.a & 1.b - ExpressionHelper.setReferenceDate(processStartDate); - ExpressionHelper evaluator = ExpressionHelper.get(); - Date startRelative = evaluator.evaluate(in.getStart(), Date.class); - Date startTimeActual = EntityUtil.getPreviousInstanceTime(feedStartDate, - feed.getFrequency(), feed.getTimezone(), startRelative); - Long offset = processStartDate.getTime() - startTimeActual.getTime(); - - // Step 1.c - Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset); - Date processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate, - consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative); - - - // Step 2. - Date currentInstance = processInstanceStartActual; - while (true) { - Date nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate, - consumer.getFrequency(), consumer.getTimezone(), currentInstance); - - ExpressionHelper.setReferenceDate(nextConsumerInstance); - evaluator = ExpressionHelper.get(); - Date inputStart = evaluator.evaluate(in.getStart(), Date.class); - Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), - feed.getTimezone(), inputStart).getTime(); - Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime(); - if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) { - if (!nextConsumerInstance.before(processCluster.getValidity().getStart()) - && nextConsumerInstance.before(processCluster.getValidity().getEnd())) { - result.add(nextConsumerInstance); - } - } else { - break; - } - currentInstance = new Date(nextConsumerInstance.getTime() + ONE_MS); - } - - // Step 3. - currentInstance = processInstanceStartActual; - while (true) { - Date nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate, - consumer.getFrequency(), consumer.getTimezone(), currentInstance); - - ExpressionHelper.setReferenceDate(nextConsumerInstance); - evaluator = ExpressionHelper.get(); - Date inputStart = evaluator.evaluate(in.getStart(), Date.class); - Long rangeStart = EntityUtil.getPreviousInstanceTime(feedStartDate, feed.getFrequency(), - feed.getTimezone(), inputStart).getTime(); - Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime(); - if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() <= rangeEnd) { - if (!nextConsumerInstance.before(processCluster.getValidity().getStart()) - && nextConsumerInstance.before(processCluster.getValidity().getEnd())) { - result.add(nextConsumerInstance); - } - } else { - break; - } - currentInstance = new Date(nextConsumerInstance.getTime() - ONE_MS); - } - } - return result; - } - - public static FeedInstanceResult getFeedInstanceListing(Entity entityObject, - Date start, Date end) throws FalconException { - Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject); - FeedInstanceResult result = new FeedInstanceResult(APIResult.Status.SUCCEEDED, "Success"); - for (String cluster : clusters) { - Feed feed = (Feed) entityObject; - Storage storage = createStorage(cluster, feed); - List<FeedInstanceStatus> feedListing = storage.getListing(feed, cluster, LocationType.DATA, start, end); - FeedInstanceResult.Instance[] instances = new FeedInstanceResult.Instance[feedListing.size()]; - int index = 0; - for (FeedInstanceStatus feedStatus : feedListing) { - FeedInstanceResult.Instance instance = new - FeedInstanceResult.Instance(cluster, feedStatus.getInstance(), - feedStatus.getStatus().name()); - instance.creationTime = feedStatus.getCreationTime(); - instance.uri = feedStatus.getUri(); - instance.size = feedStatus.getSize(); - instance.sizeH = feedStatus.getSizeH(); - instances[index++] = instance; - } - result.setInstances(instances); - } - return result; - } - - - /** - * Returns the data source type associated with the Feed's import policy. - * - * @param clusterEntity - * @param feed - * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType} - * @throws FalconException - */ - public static DatasourceType getImportDatasourceType( - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed) throws FalconException { - Cluster feedCluster = getCluster(feed, clusterEntity.getName()); - if (isImportEnabled(feedCluster)) { - return DatasourceHelper.getDatasourceType(getImportDatasourceName(feedCluster)); - } else { - return null; - } - } - - /** - * Return if Import policy is enabled in the Feed definition. - * - * @param feedCluster - * @return true if import policy is enabled else false - */ - - public static boolean isImportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (feedCluster.getType() == ClusterType.SOURCE) { - return (feedCluster.getImport() != null); - } - return false; - } - - - - /** - * Returns the data source name associated with the Feed's import policy. - * - * @param feedCluster - * @return DataSource name defined in the Datasource Entity - */ - public static String getImportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isImportEnabled(feedCluster)) { - return feedCluster.getImport().getSource().getName(); - } else { - return null; - } - } - - - - /** - * Returns Datasource table name. - * - * @param feedCluster - * @return Table or Topic name of the Datasource - */ - - public static String getImportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isImportEnabled(feedCluster)) { - return feedCluster.getImport().getSource().getTableName(); - } else { - return null; - } - } - - - - /** - * Returns the extract method type. - * - * @param feedCluster - * @return {@link org.apache.falcon.entity.v0.feed.ExtractMethod} - */ - - public static ExtractMethod getImportExtractMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isImportEnabled(feedCluster)) { - return feedCluster.getImport().getSource().getExtract().getType(); - } else { - return null; - } - } - - - - /** - * Returns the merge type of the Feed import policy. - * - * @param feedCluster - * @return {@link org.apache.falcon.entity.v0.feed.MergeType} - */ - public static MergeType getImportMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isImportEnabled(feedCluster)) { - return feedCluster.getImport().getSource().getExtract().getMergepolicy(); - } else { - return null; - } - } - - /** - * Returns the initial instance date for the import data set for coorinator. - * - * @param feedCluster - * @return Feed cluster validity start date or recent time - */ - public static Date getImportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - return feedCluster.getValidity().getStart(); - } - - - /** - * Helper method to check if the merge type is snapshot. - * - * @param feedCluster - * @return true if the feed import policy merge type is snapshot - * - */ - public static boolean isSnapshotMergeType(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - return MergeType.SNAPSHOT == getImportMergeType(feedCluster); - } - - /** - * Returns extra arguments specified in the Feed import policy. - * - * @param feedCluster - * @return - * @throws FalconException - */ - public static Map<String, String> getImportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster) - throws FalconException { - - Map<String, String> argsMap = new HashMap<String, String>(); - if (feedCluster.getImport().getArguments() == null) { - return argsMap; - } - - for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getImport().getArguments().getArguments()) { - argsMap.put(p.getName().toLowerCase(), p.getValue()); - } - return argsMap; - } - - - - - /** - * Returns Fields list specified in the Import Policy. - * - * @param feedCluster - * @return List of String - * @throws FalconException - */ - public static List<String> getImportFieldList(org.apache.falcon.entity.v0.feed.Cluster feedCluster) - throws FalconException { - if (feedCluster.getImport().getSource().getFields() == null) { - return null; - } - org.apache.falcon.entity.v0.feed.FieldsType fieldType = feedCluster.getImport().getSource().getFields(); - FieldIncludeExclude includeFileds = fieldType.getIncludes(); - if (includeFileds == null) { - return null; - } - return includeFileds.getFields(); - } - - - /** - * Returns true if exclude field lists are used. This is a TBD feature. - * - * @param ds Feed Datasource - * @return true of exclude field list is used or false. - * @throws FalconException - */ - - public static boolean isFieldExcludes(org.apache.falcon.entity.v0.feed.Datasource ds) - throws FalconException { - if (ds.getFields() != null) { - org.apache.falcon.entity.v0.feed.FieldsType fieldType = ds.getFields(); - FieldIncludeExclude excludeFileds = fieldType.getExcludes(); - if ((excludeFileds != null) && (excludeFileds.getFields().size() > 0)) { - return true; - } - } - return false; - } - - public static FeedInstanceStatus.AvailabilityStatus getFeedInstanceStatus(Feed feed, String clusterName, - Date instanceTime) - throws FalconException { - Storage storage = createStorage(clusterName, feed); - return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime); - } - - public static boolean isLifecycleEnabled(Feed feed, String clusterName) { - Cluster cluster = getCluster(feed, clusterName); - return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null); - } - - public static Frequency getLifecycleRetentionFrequency(Feed feed, String clusterName) throws FalconException { - Frequency retentionFrequency = null; - RetentionStage retentionStage = getRetentionStage(feed, clusterName); - if (retentionStage != null) { - if (retentionStage.getFrequency() != null) { - retentionFrequency = retentionStage.getFrequency(); - } else { - Frequency feedFrequency = feed.getFrequency(); - Frequency defaultFrequency = new Frequency("hours(6)"); - if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) { - retentionFrequency = defaultFrequency; - } else { - retentionFrequency = new Frequency(feedFrequency.toString()); - } - } - } - return retentionFrequency; - } - - /** - * Returns the hadoop cluster queue name specified for the replication jobs to run in the Lifecycle - * section of the target cluster section of the feed entity. - * - * NOTE: Lifecycle for replication is not implemented. This will return the queueName property value. - * - * @param feed - * @param clusterName - * @return hadoop cluster queue name specified in the feed entity - * @throws FalconException - */ - - public static String getLifecycleReplicationQueue(Feed feed, String clusterName) throws FalconException { - return null; - } - - /** - * Returns the hadoop cluster queue name specified for the retention jobs to run in the Lifecycle - * section of feed entity. - * - * @param feed - * @param clusterName - * @return hadoop cluster queue name specified in the feed entity - * @throws FalconException - */ - public static String getLifecycleRetentionQueue(Feed feed, String clusterName) throws FalconException { - RetentionStage retentionStage = getRetentionStage(feed, clusterName); - if (retentionStage != null) { - return retentionStage.getQueue(); - } else { - return null; - } - } - - /** - * Returns the data source type associated with the Feed's export policy. - * - * @param clusterEntity - * @param feed - * @return {@link org.apache.falcon.entity.v0.datasource.DatasourceType} - * @throws FalconException - */ - public static DatasourceType getExportDatasourceType( - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed) throws FalconException { - Cluster feedCluster = getCluster(feed, clusterEntity.getName()); - if (isExportEnabled(feedCluster)) { - return DatasourceHelper.getDatasourceType(getExportDatasourceName(feedCluster)); - } else { - return null; - } - } - - /** - * Return if Export policy is enabled in the Feed definition. - * - * @param feedCluster - * @return true if export policy is enabled else false - */ - - public static boolean isExportEnabled(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - return (feedCluster.getExport() != null); - } - - /** - * Returns the data source name associated with the Feed's export policy. - * - * @param feedCluster - * @return DataSource name defined in the Datasource Entity - */ - public static String getExportDatasourceName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isExportEnabled(feedCluster)) { - return feedCluster.getExport().getTarget().getName(); - } else { - return null; - } - } - - /** - * Returns Datasource table name. - * - * @param feedCluster - * @return Table or Topic name of the Datasource - */ - - public static String getExportDataSourceTableName(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isExportEnabled(feedCluster)) { - return feedCluster.getExport().getTarget().getTableName(); - } else { - return null; - } - } - - /** - * Returns the export load type. - * - * @param feedCluster - * @return {@link org.apache.falcon.entity.v0.feed.Load} - */ - - public static Load getExportLoadMethod(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - if (isExportEnabled(feedCluster)) { - return feedCluster.getExport().getTarget().getLoad(); - } else { - return null; - } - } - - /** - * Returns the initial instance date for the export data set for coorinator. - * - * @param feedCluster - * @return Feed cluster validity start date or recent time - */ - public static Date getExportInitalInstance(org.apache.falcon.entity.v0.feed.Cluster feedCluster) { - return feedCluster.getValidity().getStart(); - } - - /** - * Returns extra arguments specified in the Feed export policy. - * - * @param feedCluster - * @return - * @throws FalconException - */ - public static Map<String, String> getExportArguments(org.apache.falcon.entity.v0.feed.Cluster feedCluster) - throws FalconException { - - Map<String, String> argsMap = new HashMap<String, String>(); - if (feedCluster.getExport().getArguments() == null) { - return argsMap; - } - - for(org.apache.falcon.entity.v0.feed.Argument p : feedCluster.getExport().getArguments().getArguments()) { - argsMap.put(p.getName().toLowerCase(), p.getValue()); - } - return argsMap; - } - - public static Validity getClusterValidity(Feed feed, String clusterName) throws FalconException { - Cluster cluster = getCluster(feed, clusterName); - if (cluster == null) { - throw new FalconException("Invalid cluster: " + clusterName + " for feed: " + feed.getName()); - } - return cluster.getValidity(); - } - - public static Frequency getOldRetentionFrequency(Feed feed) { - Frequency feedFrequency = feed.getFrequency(); - Frequency defaultFrequency = new Frequency("hours(24)"); - if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) { - return new Frequency("hours(6)"); - } else { - return defaultFrequency; - } - } - - public static Frequency getRetentionFrequency(Feed feed, Cluster feedCluster) throws FalconException { - Frequency retentionFrequency; - retentionFrequency = getLifecycleRetentionFrequency(feed, feedCluster.getName()); - if (retentionFrequency == null) { - retentionFrequency = getOldRetentionFrequency(feed); - } - return retentionFrequency; - } - - public static int getRetentionLimitInSeconds(Feed feed, String clusterName) throws FalconException { - Frequency retentionLimit = new Frequency("minutes(0)"); - RetentionStage retentionStage = getRetentionStage(feed, clusterName); - if (retentionStage != null) { - for (Property property : retentionStage.getProperties().getProperties()) { - if (property.getName().equalsIgnoreCase("retention.policy.agebaseddelete.limit")) { - retentionLimit = new Frequency(property.getValue()); - break; - } - } - } else { - retentionLimit = getCluster(feed, clusterName).getRetention().getLimit(); - } - Long freqInMillis = DateUtil.getFrequencyInMillis(retentionLimit); - return (int) (freqInMillis/1000); - } - - /** - * Returns the replication job's queue name specified in the feed entity definition. - * First looks into the Lifecycle stage if exists. If null, looks into the queueName property specified - * in the Feed definition. - * - * @param feed - * @param feedCluster - * @return - * @throws FalconException - */ - public static String getReplicationQueue(Feed feed, Cluster feedCluster) throws FalconException { - String queueName; - queueName = getLifecycleReplicationQueue(feed, feedCluster.getName()); - if (StringUtils.isBlank(queueName)) { - queueName = getQueueFromProperties(feed); - } - return queueName; - } - - /** - * Returns the retention job's queue name specified in the feed entity definition. - * First looks into the Lifecycle stage. If null, looks into the queueName property specified - * in the Feed definition. - * - * @param feed - * @param feedCluster - * @return - * @throws FalconException - */ - public static String getRetentionQueue(Feed feed, Cluster feedCluster) throws FalconException { - String queueName = getLifecycleRetentionQueue(feed, feedCluster.getName()); - if (StringUtils.isBlank(queueName)) { - queueName = getQueueFromProperties(feed); - } - return queueName; - } - - /** - * Returns the queue name specified in the Feed entity definition from queueName property. - * - * @param feed - * @return queueName property value - */ - public static String getQueueFromProperties(Feed feed) { - return getPropertyValue(feed, EntityUtil.MR_QUEUE_NAME); - } - - /** - * Returns value of a feed property given property name. - * @param feed - * @param propName - * @return property value - */ - - public static String getPropertyValue(Feed feed, String propName) { - if (feed.getProperties() != null) { - for (Property prop : feed.getProperties().getProperties()) { - if ((prop != null) && (prop.getName().equals(propName))) { - return prop.getValue(); - } - } - } - return null; - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java b/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java deleted file mode 100644 index 8b43671..0000000 --- a/common/src/main/java/org/apache/falcon/entity/FeedInstanceStatus.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.entity; - -/** - * Feed Instance Status is used to provide feed instance listing and corresponding status. - * - * This is used for exchanging information for getListing api - */ -public class FeedInstanceStatus { - - private String instance; - - private final String uri; - - private long creationTime; - - private long size = -1; - - private String sizeH; - - private AvailabilityStatus status = AvailabilityStatus.MISSING; - - /** - * Availability status of a feed instance. - * - * Missing if the feed partition is entirely missing, - * Available if present and the availability flag is also present - * Availability flag is configured in feed definition, but availability flag is missing in data path - * Empty if the empty - */ - public enum AvailabilityStatus {MISSING, AVAILABLE, PARTIAL, EMPTY} - - public FeedInstanceStatus(String uri) { - this.uri = uri; - } - - public String getInstance() { - return instance; - } - - public void setInstance(String instance) { - this.instance = instance; - } - - public String getUri() { - return uri; - } - - public long getCreationTime() { - return creationTime; - } - - public void setCreationTime(long creationTime) { - this.creationTime = creationTime; - } - - public long getSize() { - return size; - } - - public String getSizeH(){ - return sizeH; - } - - public void setSize(long size) { - this.size = size; - } - - public void setSizeH(String sizeH) { - this.sizeH = sizeH; - } - - - public AvailabilityStatus getStatus() { - return status; - } - - public void setStatus(AvailabilityStatus status) { - this.status = status; - } - - @Override - public String toString() { - return "FeedInstanceStatus{" - + "instance='" + instance + '\'' - + ", uri='" + uri + '\'' - + ", creationTime=" + creationTime - + ", size=" + size - + ", status='" + status + '\'' - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - FeedInstanceStatus that = (FeedInstanceStatus) o; - - if (creationTime != that.creationTime) { - return false; - } - if (size != that.size) { - return false; - } - if (!instance.equals(that.instance)) { - return false; - } - if (status != that.status) { - return false; - } - if (uri != null ? !uri.equals(that.uri) : that.uri != null) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = instance.hashCode(); - result = 31 * result + (uri != null ? uri.hashCode() : 0); - result = 31 * result + (int) (creationTime ^ (creationTime >>> 32)); - result = 31 * result + (int) (size ^ (size >>> 32)); - result = 31 * result + (status != null ? status.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java deleted file mode 100644 index ece8b5d..0000000 --- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java +++ /dev/null @@ -1,509 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.Pair; -import org.apache.falcon.entity.common.FeedDataPath; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.retention.EvictedInstanceSerDe; -import org.apache.falcon.retention.EvictionHelper; -import org.apache.falcon.security.CurrentUser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.jsp.el.ELException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.TimeZone; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * A file system implementation of a feed storage. - */ -public class FileSystemStorage extends Configured implements Storage { - - private static final Logger LOG = LoggerFactory.getLogger(FileSystemStorage.class); - private final StringBuffer instancePaths = new StringBuffer(); - private final StringBuilder instanceDates = new StringBuilder(); - - public static final String FEED_PATH_SEP = "#"; - public static final String LOCATION_TYPE_SEP = "="; - - public static final String FILE_SYSTEM_URL = "${nameNode}"; - - private final String storageUrl; - private final List<Location> locations; - - public FileSystemStorage(Feed feed) { - this(FILE_SYSTEM_URL, feed.getLocations()); - } - - protected FileSystemStorage(String storageUrl, Locations locations) { - this(storageUrl, locations.getLocations()); - } - - protected FileSystemStorage(String storageUrl, List<Location> locations) { - if (storageUrl == null || storageUrl.length() == 0) { - throw new IllegalArgumentException("FileSystem URL cannot be null or empty"); - } - - if (locations == null || locations.size() == 0) { - throw new IllegalArgumentException("FileSystem Locations cannot be null or empty"); - } - - this.storageUrl = storageUrl; - this.locations = locations; - } - - /** - * Create an instance from the URI Template that was generated using - * the getUriTemplate() method. - * - * @param uriTemplate the uri template from org.apache.falcon.entity.FileSystemStorage#getUriTemplate - * @throws URISyntaxException - */ - protected FileSystemStorage(String uriTemplate) throws URISyntaxException { - if (uriTemplate == null || uriTemplate.length() == 0) { - throw new IllegalArgumentException("URI template cannot be null or empty"); - } - - String rawStorageUrl = null; - List<Location> rawLocations = new ArrayList<Location>(); - String[] feedLocs = uriTemplate.split(FEED_PATH_SEP); - for (String rawPath : feedLocs) { - String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP); - final String processed = typeAndPath[1].replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED) - .replaceAll("}", EXPR_CLOSE_NORMALIZED); - URI uri = new URI(processed); - if (rawStorageUrl == null) { - rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority(); - } - - String path = uri.getPath(); - final String finalPath = path.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX) - .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX); - - Location location = new Location(); - location.setPath(finalPath); - location.setType(LocationType.valueOf(typeAndPath[0])); - rawLocations.add(location); - } - - this.storageUrl = rawStorageUrl; - this.locations = rawLocations; - } - - @Override - public TYPE getType() { - return TYPE.FILESYSTEM; - } - - public String getStorageUrl() { - return storageUrl; - } - - public List<Location> getLocations() { - return locations; - } - - @Override - public String getUriTemplate() { - String feedPathMask = getUriTemplate(LocationType.DATA); - String metaPathMask = getUriTemplate(LocationType.META); - String statsPathMask = getUriTemplate(LocationType.STATS); - String tmpPathMask = getUriTemplate(LocationType.TMP); - - StringBuilder feedBasePaths = new StringBuilder(); - feedBasePaths.append(LocationType.DATA.name()) - .append(LOCATION_TYPE_SEP) - .append(feedPathMask); - - if (metaPathMask != null) { - feedBasePaths.append(FEED_PATH_SEP) - .append(LocationType.META.name()) - .append(LOCATION_TYPE_SEP) - .append(metaPathMask); - } - - if (statsPathMask != null) { - feedBasePaths.append(FEED_PATH_SEP) - .append(LocationType.STATS.name()) - .append(LOCATION_TYPE_SEP) - .append(statsPathMask); - } - - if (tmpPathMask != null) { - feedBasePaths.append(FEED_PATH_SEP) - .append(LocationType.TMP.name()) - .append(LOCATION_TYPE_SEP) - .append(tmpPathMask); - } - - return feedBasePaths.toString(); - } - - @Override - public String getUriTemplate(LocationType locationType) { - return getUriTemplate(locationType, locations); - } - - public String getUriTemplate(LocationType locationType, List<Location> locationList) { - Location locationForType = null; - for (Location location : locationList) { - if (location.getType() == locationType) { - locationForType = location; - break; - } - } - - if (locationForType == null || StringUtils.isEmpty(locationForType.getPath())) { - return null; - } - - // normalize the path so trailing and double '/' are removed - Path locationPath = new Path(locationForType.getPath()); - locationPath = locationPath.makeQualified(getDefaultUri(), getWorkingDir()); - - if (isRelativePath(locationPath)) { - locationPath = new Path(storageUrl + locationPath); - } - - return locationPath.toString(); - } - - private boolean isRelativePath(Path locationPath) { - return locationPath.toUri().getAuthority() == null && isStorageUrlATemplate(); - } - - private boolean isStorageUrlATemplate() { - return storageUrl.startsWith(FILE_SYSTEM_URL); - } - - private URI getDefaultUri() { - return new Path(isStorageUrlATemplate() ? "/" : storageUrl).toUri(); - } - - public Path getWorkingDir() { - return new Path(CurrentUser.isAuthenticated() ? "/user/" + CurrentUser.getUser() : "/"); - } - - @Override - public boolean isIdentical(Storage toCompareAgainst) throws FalconException { - if (!(toCompareAgainst instanceof FileSystemStorage)) { - return false; - } - - FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst; - final List<Location> fsStorageLocations = fsStorage.getLocations(); - - return getLocations().size() == fsStorageLocations.size() - && StringUtils.equals(getUriTemplate(LocationType.DATA, getLocations()), - getUriTemplate(LocationType.DATA, fsStorageLocations)) - && StringUtils.equals(getUriTemplate(LocationType.STATS, getLocations()), - getUriTemplate(LocationType.STATS, fsStorageLocations)) - && StringUtils.equals(getUriTemplate(LocationType.META, getLocations()), - getUriTemplate(LocationType.META, fsStorageLocations)) - && StringUtils.equals(getUriTemplate(LocationType.TMP, getLocations()), - getUriTemplate(LocationType.TMP, fsStorageLocations)); - } - - public static Location getLocation(List<Location> locations, LocationType type) { - for (Location loc : locations) { - if (loc.getType() == type) { - return loc; - } - } - - return null; - } - - @Override - public void validateACL(AccessControlList acl) throws FalconException { - try { - for (Location location : getLocations()) { - String pathString = getRelativePath(location); - Path path = new Path(pathString); - FileSystem fileSystem = - HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), getConf()); - if (fileSystem.exists(path)) { - FileStatus fileStatus = fileSystem.getFileStatus(path); - Set<String> groups = CurrentUser.getGroupNames(); - - if (fileStatus.getOwner().equals(acl.getOwner()) - || groups.contains(acl.getGroup())) { - return; - } - - LOG.error("Permission denied: Either Feed ACL owner {} or group {} doesn't " - + "match the actual file owner {} or group {} for file {}", - acl, acl.getGroup(), fileStatus.getOwner(), fileStatus.getGroup(), path); - throw new FalconException("Permission denied: Either Feed ACL owner " - + acl + " or group " + acl.getGroup() + " doesn't match the actual " - + "file owner " + fileStatus.getOwner() + " or group " - + fileStatus.getGroup() + " for file " + path); - } - } - } catch (IOException e) { - LOG.error("Can't validate ACL on storage {}", getStorageUrl(), e); - throw new RuntimeException("Can't validate storage ACL (URI " + getStorageUrl() + ")", e); - } - } - - @Override - public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException { - TimeZone tz = TimeZone.getTimeZone(timeZone); - try{ - for (Location location : getLocations()) { - fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, tz, logFilePath); - } - EvictedInstanceSerDe.serializeEvictedInstancePaths( - HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), getConf()), - logFilePath, instancePaths); - }catch (IOException e){ - throw new FalconException("Couldn't evict feed from fileSystem", e); - }catch (ELException e){ - throw new FalconException("Couldn't evict feed from fileSystem", e); - } - - return instanceDates; - } - - private void fileSystemEvictor(String feedPath, String retentionLimit, TimeZone timeZone, - Path logFilePath) throws IOException, ELException, FalconException { - Path normalizedPath = new Path(feedPath); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri()); - feedPath = normalizedPath.toUri().getPath(); - LOG.info("Normalized path: {}", feedPath); - - Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit); - - List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, range.first, fs); - if (toBeDeleted.isEmpty()) { - LOG.info("No instances to delete."); - return; - } - - DateFormat dateFormat = new SimpleDateFormat(FeedHelper.FORMAT); - dateFormat.setTimeZone(timeZone); - Path feedBasePath = fs.makeQualified(FeedHelper.getFeedBasePath(feedPath)); - for (Path path : toBeDeleted) { - deleteInstance(fs, path, feedBasePath); - Date date = FeedHelper.getDate(feedPath, new Path(path.toUri().getPath()), timeZone); - instanceDates.append(dateFormat.format(date)).append(','); - instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); - } - } - - private List<Path> discoverInstanceToDelete(String inPath, TimeZone timeZone, Date start, FileSystem fs) - throws IOException { - FileStatus[] files = findFilesForFeed(fs, inPath); - if (files == null || files.length == 0) { - return Collections.emptyList(); - } - - List<Path> toBeDeleted = new ArrayList<Path>(); - for (FileStatus file : files) { - Date date = FeedHelper.getDate(inPath, new Path(file.getPath().toUri().getPath()), timeZone); - LOG.debug("Considering {}", file.getPath().toUri().getPath()); - LOG.debug("Date: {}", date); - if (date != null && !isDateInRange(date, start)) { - toBeDeleted.add(file.getPath()); - } - } - return toBeDeleted; - } - - private FileStatus[] findFilesForFeed(FileSystem fs, String feedBasePath) throws IOException { - Matcher matcher = FeedDataPath.PATTERN.matcher(feedBasePath); - while (matcher.find()) { - String var = feedBasePath.substring(matcher.start(), matcher.end()); - feedBasePath = feedBasePath.replaceAll(Pattern.quote(var), "*"); - matcher = FeedDataPath.PATTERN.matcher(feedBasePath); - } - LOG.info("Searching for {}", feedBasePath); - return fs.globStatus(new Path(feedBasePath)); - } - - private boolean isDateInRange(Date date, Date start) { - //ignore end ( && date.compareTo(end) <= 0 ) - return date.compareTo(start) >= 0; - } - - private void deleteInstance(FileSystem fs, Path path, Path feedBasePath) throws IOException { - if (fs.delete(path, true)) { - LOG.info("Deleted instance: {}", path); - }else{ - throw new IOException("Unable to delete instance: " + path); - } - deleteParentIfEmpty(fs, path.getParent(), feedBasePath); - } - - private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws IOException { - if (feedBasePath.equals(parent)) { - LOG.info("Not deleting feed base path: {}", parent); - } else { - FileStatus[] files = fs.listStatus(parent); - if (files != null && files.length == 0) { - LOG.info("Parent path: {} is empty, deleting path", parent); - if (fs.delete(parent, true)) { - LOG.info("Deleted empty dir: {}", parent); - } else { - throw new IOException("Unable to delete parent path:" + parent); - } - deleteParentIfEmpty(fs, parent.getParent(), feedBasePath); - } - } - } - - @Override - @SuppressWarnings("MagicConstant") - public List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType, - Date start, Date end) throws FalconException { - - Calendar calendar = Calendar.getInstance(); - List<Location> clusterSpecificLocation = FeedHelper. - getLocations(FeedHelper.getCluster(feed, clusterName), feed); - Location location = getLocation(clusterSpecificLocation, locationType); - try { - FileSystem fileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConf()); - Cluster cluster = ClusterHelper.getCluster(clusterName); - Properties baseProperties = FeedHelper.getClusterProperties(cluster); - baseProperties.putAll(FeedHelper.getFeedProperties(feed)); - List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); - Date feedStart = FeedHelper.getCluster(feed, clusterName).getValidity().getStart(); - TimeZone tz = feed.getTimezone(); - Date alignedStart = EntityUtil.getNextStartTime(feedStart, feed.getFrequency(), tz, start); - - String basePath = location.getPath(); - while (!end.before(alignedStart)) { - Properties allProperties = ExpressionHelper.getTimeVariables(alignedStart, tz); - allProperties.putAll(baseProperties); - String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties); - FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath)); - FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath); - - Date date = FeedHelper.getDate(basePath, new Path(feedInstancePath), tz); - instance.setInstance(SchemaHelper.formatDateUTC(date)); - if (fileStatus != null) { - instance.setCreationTime(fileStatus.getModificationTime()); - ContentSummary contentSummary = fileSystem.getContentSummary(fileStatus.getPath()); - if (contentSummary != null) { - long size = contentSummary.getSpaceConsumed(); - instance.setSize(size); - if (!StringUtils.isEmpty(feed.getAvailabilityFlag())) { - FileStatus doneFile = getFileStatus(fileSystem, - new Path(fileStatus.getPath(), feed.getAvailabilityFlag())); - if (doneFile != null) { - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE); - } else { - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); - } - } else { - instance.setStatus(size > 0 ? FeedInstanceStatus.AvailabilityStatus.AVAILABLE - : FeedInstanceStatus.AvailabilityStatus.EMPTY); - } - } - } - instances.add(instance); - calendar.setTime(alignedStart); - calendar.add(feed.getFrequency().getTimeUnit().getCalendarUnit(), - feed.getFrequency().getFrequencyAsInt()); - alignedStart = calendar.getTime(); - } - return instances; - } catch (IOException e) { - LOG.error("Unable to retrieve listing for {}:{}", locationType, getStorageUrl(), e); - throw new FalconException("Unable to retrieve listing for (URI " + getStorageUrl() + ")", e); - } - } - - @Override - public FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, - LocationType locationType, - Date instanceTime) throws FalconException { - - List<FeedInstanceStatus> result = getListing(feed, clusterName, locationType, instanceTime, instanceTime); - if (result.isEmpty()) { - return FeedInstanceStatus.AvailabilityStatus.MISSING; - } else { - return result.get(0).getStatus(); - } - } - - public FileStatus getFileStatus(FileSystem fileSystem, Path feedInstancePath) { - FileStatus fileStatus = null; - try { - fileStatus = fileSystem.getFileStatus(feedInstancePath); - } catch (IOException ignore) { - //ignore - } - return fileStatus; - } - - public Configuration getConf() { - Configuration conf = new Configuration(); - conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl); - return conf; - } - - private String getRelativePath(Location location) { - // if the path contains variables, locate on the "parent" path (just before first variable usage) - Matcher matcher = FeedDataPath.PATTERN.matcher(location.getPath()); - boolean timedPath = matcher.find(); - if (timedPath) { - return location.getPath().substring(0, matcher.start()); - } - return location.getPath(); - } - - @Override - public String toString() { - return "FileSystemStorage{" - + "storageUrl='" + storageUrl + '\'' - + ", locations=" + locations - + '}'; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/HiveUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java deleted file mode 100644 index f4029e4..0000000 --- a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.security.SecurityUtil; - -import java.util.Properties; - -/** - * Hive Utilities. - */ -public final class HiveUtil { - public static final String METASTOREURIS = "hive.metastore.uris"; - public static final String METASTROE_URI = "hcat.metastore.uri"; - public static final String NODE = "hcatNode"; - public static final String METASTORE_UGI = "hive.metastore.execute.setugi"; - - private HiveUtil() { - - } - - public static Properties getHiveCredentials(Cluster cluster) { - String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster); - if (StringUtils.isBlank(metaStoreUrl)) { - throw new IllegalStateException( - "Registry interface is not defined in cluster: " + cluster.getName()); - } - - Properties hiveCredentials = new Properties(); - hiveCredentials.put(METASTOREURIS, metaStoreUrl); - hiveCredentials.put(METASTORE_UGI, "true"); - hiveCredentials.put(NODE, metaStoreUrl.replace("thrift", "hcat")); - hiveCredentials.put(METASTROE_URI, metaStoreUrl); - - if (SecurityUtil.isSecurityEnabled()) { - String principal = ClusterHelper - .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL); - hiveCredentials.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal); - hiveCredentials.put(SecurityUtil.METASTORE_PRINCIPAL, principal); - hiveCredentials.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true"); - } - return hiveCredentials; - } -}
