http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java deleted file mode 100644 index e4d9385..0000000 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.v0; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.service.ConfigurationChangeListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * An in-memory graph of entities and relationship among themselves. - */ -public final class EntityGraph implements ConfigurationChangeListener { - - private static final Logger LOG = LoggerFactory.getLogger(EntityGraph.class); - - private static EntityGraph instance = new EntityGraph(); - - private Map<Node, Set<Node>> graph = new ConcurrentHashMap<Node, Set<Node>>(); - - private EntityGraph() { - } - - public static EntityGraph get() { - return instance; - } - - public Set<Entity> getDependents(Entity entity) throws FalconException { - Node entityNode = new Node(entity.getEntityType(), entity.getName()); - if (graph.containsKey(entityNode)) { - ConfigurationStore store = ConfigurationStore.get(); - Set<Entity> dependents = new HashSet<Entity>(); - for (Node node : graph.get(entityNode)) { - Entity dependentEntity = store.get(node.type, node.name); - if (dependentEntity != null) { - dependents.add(dependentEntity); - } else { - LOG.error("Dependent entity {} was not found in configuration store.", node); - } - } - return dependents; - } else { - return null; - } - } - - @Override - public void onAdd(Entity entity) throws FalconException { - Map<Node, Set<Node>> nodeEdges = null; - switch (entity.getEntityType()) { - case PROCESS: - nodeEdges = getEdgesFor((Process) entity); - break; - case FEED: - nodeEdges = getEdgesFor((Feed) entity); - break; - default: - } - if (nodeEdges == null) { - return; - } - LOG.debug("Adding edges for {}: {}", entity.getName(), nodeEdges); - - for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) { - LOG.debug("Adding edges : {} for {}", entry.getValue(), entry.getKey()); - if (graph.containsKey(entry.getKey())) { - graph.get(entry.getKey()).addAll(entry.getValue()); - } else { - graph.put(entry.getKey(), entry.getValue()); - } - } - LOG.debug("Merged edges to graph {}", entity.getName()); - } - - @Override - public void onRemove(Entity entity) throws FalconException { - Map<Node, Set<Node>> nodeEdges = null; - switch (entity.getEntityType()) { - case PROCESS: - nodeEdges = getEdgesFor((Process) entity); - break; - case FEED: - nodeEdges = getEdgesFor((Feed) entity); - break; - default: - } - if (nodeEdges == null) { - return; - } - - for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) { - if (graph.containsKey(entry.getKey())) { - graph.get(entry.getKey()).removeAll(entry.getValue()); - if (graph.get(entry.getKey()).isEmpty()) { - graph.remove(entry.getKey()); - } - } - } - } - - @Override - public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - onRemove(oldEntity); - onAdd(newEntity); - } - - @Override - public void onReload(Entity entity) throws FalconException { - onAdd(entity); - } - - private Map<Node, Set<Node>> getEdgesFor(Process process) { - Map<Node, Set<Node>> nodeEdges = new HashMap<Node, Set<Node>>(); - Node processNode = new Node(EntityType.PROCESS, process.getName()); - nodeEdges.put(processNode, new HashSet<Node>()); - Set<Node> processEdges = nodeEdges.get(processNode); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - Node feedNode = new Node(EntityType.FEED, input.getFeed()); - if (!nodeEdges.containsKey(feedNode)) { - nodeEdges.put(feedNode, new HashSet<Node>()); - } - Set<Node> feedEdges = nodeEdges.get(feedNode); - processEdges.add(feedNode); - feedEdges.add(processNode); - } - } - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - Node feedNode = new Node(EntityType.FEED, output.getFeed()); - if (!nodeEdges.containsKey(feedNode)) { - nodeEdges.put(feedNode, new HashSet<Node>()); - } - Set<Node> feedEdges = nodeEdges.get(feedNode); - processEdges.add(feedNode); - feedEdges.add(processNode); - } - } - - for (Cluster cluster : process.getClusters().getClusters()) { - Node clusterNode = new Node(EntityType.CLUSTER, cluster.getName()); - processEdges.add(clusterNode); - nodeEdges.put(clusterNode, new HashSet<Node>()); - nodeEdges.get(clusterNode).add(processNode); - } - - return nodeEdges; - } - - private Map<Node, Set<Node>> getEdgesFor(Feed feed) { - Map<Node, Set<Node>> nodeEdges = new HashMap<Node, Set<Node>>(); - Node feedNode = new Node(EntityType.FEED, feed.getName()); - Set<Node> feedEdges = new HashSet<Node>(); - nodeEdges.put(feedNode, feedEdges); - - for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { - Node clusterNode = new Node(EntityType.CLUSTER, cluster.getName()); - if (!nodeEdges.containsKey(clusterNode)) { - nodeEdges.put(clusterNode, new HashSet<Node>()); - } - Set<Node> clusterEdges = nodeEdges.get(clusterNode); - feedEdges.add(clusterNode); - clusterEdges.add(feedNode); - - if (FeedHelper.isImportEnabled(cluster)) { - Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster)); - if (!nodeEdges.containsKey(dbNode)) { - nodeEdges.put(dbNode, new HashSet<Node>()); - } - Set<Node> dbEdges = nodeEdges.get(dbNode); - feedEdges.add(dbNode); - dbEdges.add(feedNode); - } - } - return nodeEdges; - } - - /** - * Node element in the graph. - */ - private static final class Node { - - private final EntityType type; - private final String name; - - private Node(EntityType type, String name) { - this.type = type; - this.name = name; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Node node = (Node) o; - - boolean nameEqual = name != null ? !name.equals(node.name) : node.name != null; - - if (nameEqual) { - return false; - } - if (type != node.type) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = type != null ? type.hashCode() : 0; - result = 31 * result + (name != null ? name.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "(" + type + ") " + name; - } - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java deleted file mode 100644 index 4c7e913..0000000 --- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.v0; - -import org.apache.falcon.FalconException; -import org.apache.falcon.Pair; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -/** - * Helper methods to check integrity of entity. - */ -public final class EntityIntegrityChecker { - - private EntityIntegrityChecker() {} - - public static Pair<String, EntityType>[] referencedBy(Entity entity) throws FalconException { - Set<Entity> deps = EntityGraph.get().getDependents(entity); - if (deps == null) { - return null; - } - - switch (entity.getEntityType()) { - case CLUSTER: - return filter(deps, EntityType.FEED, EntityType.PROCESS); - - case FEED: - return filter(deps, EntityType.PROCESS); - - case DATASOURCE: - return filter(deps, EntityType.FEED); - - default: - return null; - } - } - - @SuppressWarnings("unchecked") - private static Pair<String, EntityType>[] filter(Set<Entity> deps, EntityType... types) { - List<Pair<String, EntityType>> filteredSet = new ArrayList<Pair<String, EntityType>>(); - List<EntityType> validTypes = Arrays.asList(types); - for (Entity dep : deps) { - if (validTypes.contains(dep.getEntityType())) { - filteredSet.add(Pair.of(dep.getName(), dep.getEntityType())); - } - } - return filteredSet.toArray(new Pair[0]); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java deleted file mode 100644 index cad196b..0000000 --- a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.v0; - -import org.apache.falcon.FalconException; - -/** - * This exception is thrown when Unschedulable entity - * like CLUSTER is tried with actions like Schedule, Suspend, - * Resume. - */ -public class UnschedulableEntityException extends FalconException { - - private static final long serialVersionUID = -1134342662497698943L; - - public UnschedulableEntityException(Exception e) { - super(e); - } - - public UnschedulableEntityException(String message, Exception e) { - super(message, e); - } - - public UnschedulableEntityException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java deleted file mode 100644 index 65aaeba..0000000 --- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.expression; - -import org.apache.commons.el.ExpressionEvaluatorImpl; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.common.FeedDataPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.jsp.el.ELException; -import javax.servlet.jsp.el.ExpressionEvaluator; -import javax.servlet.jsp.el.FunctionMapper; -import javax.servlet.jsp.el.VariableResolver; -import java.lang.reflect.Method; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.Properties; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Helper for evaluating expressions. - */ -public final class ExpressionHelper implements FunctionMapper, VariableResolver { - - private static final Logger LOG = LoggerFactory.getLogger(ExpressionHelper.class); - private static final ExpressionHelper INSTANCE = new ExpressionHelper(); - - private static final ThreadLocal<Properties> THREAD_VARIABLES = new ThreadLocal<Properties>(); - - private static final Pattern SYS_PROPERTY_PATTERN = Pattern.compile("\\$\\{[A-Za-z0-9_.]+\\}"); - - private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); - private static final ExpressionHelper RESOLVER = ExpressionHelper.get(); - - public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() { - @Override - protected SimpleDateFormat initialValue() { - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); - format.setTimeZone(TimeZone.getTimeZone("UTC")); - return format; - } - }; - - public static ExpressionHelper get() { - return INSTANCE; - } - - private enum DayOfWeek { - SUN, MON, TUE, WED, THU, FRI, SAT - } - - private ExpressionHelper() { - } - - public <T> T evaluate(String expression, Class<T> clazz) throws FalconException { - return evaluateFullExpression("${" + expression + "}", clazz); - } - - @SuppressWarnings("unchecked") - public <T> T evaluateFullExpression(String expression, Class<T> clazz) throws FalconException { - try { - return (T) EVALUATOR.evaluate(expression, clazz, RESOLVER, RESOLVER); - } catch (ELException e) { - throw new FalconException("Unable to evaluate " + expression, e); - } - } - - @Override - public Method resolveFunction(String prefix, String name) { - for (Method method : ExpressionHelper.class.getDeclaredMethods()) { - if (method.getName().equals(name)) { - return method; - } - } - throw new UnsupportedOperationException("Not found " + prefix + ":" + name); - } - - public void setPropertiesForVariable(Properties properties) { - THREAD_VARIABLES.set(properties); - } - - @Override - public Object resolveVariable(String field) { - return THREAD_VARIABLES.get().get(field); - } - - private static ThreadLocal<Date> referenceDate = new ThreadLocal<Date>(); - - public static void setReferenceDate(Date date) { - referenceDate.set(date); - Properties variables = getTimeVariables(date, TimeZone.getTimeZone("UTC")); - THREAD_VARIABLES.set(variables); - } - - public static Properties getTimeVariables(Date date, TimeZone tz) { - Properties vars = new Properties(); - Calendar cal = Calendar.getInstance(tz); - cal.setTime(date); - vars.put(FeedDataPath.VARS.YEAR.name(), String.format("%04d", cal.get(Calendar.YEAR))); - vars.put(FeedDataPath.VARS.MONTH.name(), String.format("%02d", (cal.get(Calendar.MONTH) + 1))); - vars.put(FeedDataPath.VARS.DAY.name(), String.format("%02d", cal.get(Calendar.DAY_OF_MONTH))); - vars.put(FeedDataPath.VARS.HOUR.name(), String.format("%02d", cal.get(Calendar.HOUR_OF_DAY))); - vars.put(FeedDataPath.VARS.MINUTE.name(), String.format("%02d", cal.get(Calendar.MINUTE))); - return vars; - } - - private static int getDayOffset(String weekDayName) { - int day; - Calendar nominalTime = Calendar.getInstance(); - nominalTime.setTimeZone(TimeZone.getTimeZone("UTC")); - nominalTime.setTime(referenceDate.get()); - int currentWeekDay = nominalTime.get(Calendar.DAY_OF_WEEK); - int weekDay = DayOfWeek.valueOf(weekDayName).ordinal() + 1; //to map to Calendar.SUNDAY ... - day = weekDay - currentWeekDay; - if (weekDay > currentWeekDay) { - day = day - 7; - } - return day; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SF_SWITCH_FALLTHROUGH"}) - private static Date getRelative(Date date, int boundary, int month, int day, int hour, int minute) { - Calendar dsInstanceCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - dsInstanceCal.setTime(date); - switch (boundary) { - case Calendar.YEAR: - dsInstanceCal.set(Calendar.MONTH, 0); - case Calendar.MONTH: - dsInstanceCal.set(Calendar.DAY_OF_MONTH, 1); - case Calendar.DAY_OF_MONTH: - dsInstanceCal.set(Calendar.HOUR_OF_DAY, 0); - case Calendar.HOUR: - dsInstanceCal.set(Calendar.MINUTE, 0); - dsInstanceCal.set(Calendar.SECOND, 0); - dsInstanceCal.set(Calendar.MILLISECOND, 0); - break; - case Calendar.SECOND: - break; - default: - throw new IllegalArgumentException("Invalid boundary " + boundary); - } - - dsInstanceCal.add(Calendar.YEAR, 0); - dsInstanceCal.add(Calendar.MONTH, month); - dsInstanceCal.add(Calendar.DAY_OF_MONTH, day); - dsInstanceCal.add(Calendar.HOUR_OF_DAY, hour); - dsInstanceCal.add(Calendar.MINUTE, minute); - return dsInstanceCal.getTime(); - } - - public static Date now(int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.SECOND, 0, 0, hour, minute); - } - - public static Date today(int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, 0, hour, minute); - } - - public static Date yesterday(int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, -1, hour, minute); - } - - public static Date currentMonth(int day, int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.MONTH, 0, day, hour, minute); - } - - public static Date lastMonth(int day, int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.MONTH, -1, day, hour, minute); - } - - public static Date currentWeek(String weekDay, int hour, int minute) { - int day = getDayOffset(weekDay); - return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, day, hour, minute); - } - - public static Date lastWeek(String weekDay, int hour, int minute) { - int day = getDayOffset(weekDay); - return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, day - 7, hour, minute); - } - - public static Date currentYear(int month, int day, int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.YEAR, month, day, hour, minute); - } - - public static Date lastYear(int month, int day, int hour, int minute) { - return getRelative(referenceDate.get(), Calendar.YEAR, month - 12, day, hour, minute); - } - - public static Date latest(int n) { - //by pass Falcon validations - return referenceDate.get(); - } - - public static Date future(int n, int limit) { - //by pass Falcon validations - return referenceDate.get(); - } - - public static long hours(int val) { - return TimeUnit.HOURS.toMillis(val); - } - - public static long minutes(int val) { - return TimeUnit.MINUTES.toMillis(val); - } - - public static long days(int val) { - return TimeUnit.DAYS.toMillis(val); - } - - public static long months(int val) { - return val * days(31); - } - - public static long years(int val) { - return val * days(366); - } - - public static String substitute(String originalValue) { - return substitute(originalValue, System.getProperties()); - } - - public static String substitute(String originalValue, Properties properties) { - Matcher envVarMatcher = SYS_PROPERTY_PATTERN.matcher(originalValue); - while (envVarMatcher.find()) { - String envVar = originalValue.substring(envVarMatcher.start() + 2, - envVarMatcher.end() - 1); - String envVal = properties.getProperty(envVar, System.getenv(envVar)); - - envVar = "\\$\\{" + envVar + "\\}"; - if (envVal != null) { - originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal)); - envVarMatcher = SYS_PROPERTY_PATTERN.matcher(originalValue); - } - } - return originalValue; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/group/FeedGroup.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java deleted file mode 100644 index d288925..0000000 --- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.group; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.common.FeedDataPath; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.LocationType; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; - -/** - * Group, which represents a logical group of feeds which can belong to this - * group. - */ -public class FeedGroup { - - public FeedGroup(String group, Frequency frequency, String path) { - this.name = group; - this.frequency = frequency; - this.datePattern = getDatePattern(path); - this.feeds = Collections - .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); - } - - public static String getDatePattern(String path) { - Matcher matcher = FeedDataPath.PATTERN.matcher(path); - List<String> fields = new ArrayList<String>(); - while (matcher.find()) { - String var = path.substring(matcher.start(), matcher.end()); - fields.add(var); - } - Collections.sort(fields); - return fields.toString(); - } - - private String name; - private Frequency frequency; - private String datePattern; - private Set<String> feeds; - - public Set<String> getFeeds() { - return feeds; - } - - @Override - public boolean equals(Object obj) { - if (obj == null || !(obj instanceof FeedGroup)) { - return false; - } - FeedGroup group = (FeedGroup) obj; - return (this.name.equals(group.getName()) - && this.frequency.equals(group.frequency) - && this.datePattern - .equals(group.datePattern)); - - } - - @Override - public int hashCode() { - return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode(); - } - - public String getName() { - return name; - } - - public Frequency getFrequency() { - return frequency; - } - - public String getDatePattern() { - return datePattern; - } - - public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException { - return this.frequency.equals(feed.getFrequency()) - && this.datePattern.equals(getDatePattern( - FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA))); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java deleted file mode 100644 index a832366..0000000 --- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.group; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.service.ConfigurationChangeListener; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Has 2 way mappings from feed to group and group to feed. - */ -public final class FeedGroupMap implements ConfigurationChangeListener { - - private static final FeedGroupMap INSTANCE = new FeedGroupMap(); - private Map<String, FeedGroup> groupsMapping = new ConcurrentHashMap<String, FeedGroup>(); - - private FeedGroupMap() { - // singleton - } - - public static FeedGroupMap get() { - return INSTANCE; - } - - public Map<String, FeedGroup> getGroupsMapping() { - return Collections.unmodifiableMap(groupsMapping); - } - - @Override - public void onAdd(Entity entity) throws FalconException { - - if (entity.getEntityType().equals(EntityType.FEED)) { - Feed feed = (Feed) entity; - if (feed.getGroups() == null || feed.getGroups().equals("")) { - return; - } - Set<FeedGroup> groupSet = getGroups(feed); - addGroups(feed.getName(), groupSet); - } - } - - @Override - public void onRemove(Entity entity) throws FalconException { - if (entity.getEntityType().equals(EntityType.FEED)) { - Feed feed = (Feed) entity; - if (StringUtils.isEmpty(feed.getGroups())) { - return; - } - String[] groups = feed.getGroups().split(","); - for (String group : groups) { - groupsMapping.get(group).getFeeds().remove(entity.getName()); - if (groupsMapping.get(group).getFeeds().size() == 0) { - groupsMapping.remove(group); - } - } - - } - } - - @Override - public void onChange(Entity oldEntity, Entity newEntity) - throws FalconException { - - onRemove(oldEntity); - onAdd(newEntity); - } - - @Override - public void onReload(Entity entity) throws FalconException { - onAdd(entity); - } - - private void addGroups(String feed, Set<FeedGroup> groups) { - for (FeedGroup group : groups) { - if (groupsMapping.containsKey(group.getName())) { - groupsMapping.get(group.getName()).getFeeds().add(feed); - } else { - group.getFeeds().add(feed); - groupsMapping.put(group.getName(), group); - } - } - } - - public Set<FeedGroup> getGroups(String groups, Frequency frequency, String path) { - Set<FeedGroup> groupSet = new HashSet<FeedGroup>(); - String[] groupArray = groups.split(","); - for (String group : groupArray) { - groupSet.add(new FeedGroup(group, frequency, path)); - } - return groupSet; - } - - public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException { - return getGroups(feed.getGroups(), feed.getFrequency(), - FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java deleted file mode 100644 index e33d353..0000000 --- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hadoop; - -import org.apache.commons.lang.Validate; -import org.apache.falcon.FalconException; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.security.SecurityUtil; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.PrivilegedExceptionAction; - -/** - * A factory implementation to dole out FileSystem handles based on the logged in user. - */ -public final class HadoopClientFactory { - - private static final Logger LOG = LoggerFactory.getLogger(HadoopClientFactory.class); - - public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; - public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address"; - public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address"; - - public static final FsPermission READ_EXECUTE_PERMISSION = - new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE); - public static final FsPermission ALL_PERMISSION = - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); - - private static final HadoopClientFactory INSTANCE = new HadoopClientFactory(); - - private HadoopClientFactory() { - } - - public static HadoopClientFactory get() { - return INSTANCE; - } - - /** - * This method is only used by Falcon internally to talk to the config store on HDFS. - * - * @param uri file system URI for config store. - * @return FileSystem created with the provided proxyUser/group. - * @throws org.apache.falcon.FalconException - * if the filesystem could not be created. - */ - public FileSystem createFalconFileSystem(final URI uri) throws FalconException { - Validate.notNull(uri, "uri cannot be null"); - - try { - Configuration conf = new Configuration(); - if (UserGroupInformation.isSecurityEnabled()) { - conf.set(SecurityUtil.NN_PRINCIPAL, StartupProperties.get().getProperty(SecurityUtil.NN_PRINCIPAL)); - } - - return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf); - } catch (IOException e) { - throw new FalconException("Exception while getting FileSystem for: " + uri, e); - } - } - - /** - * This method is only used by Falcon internally to talk to the config store on HDFS. - * - * @param conf configuration. - * @return FileSystem created with the provided proxyUser/group. - * @throws org.apache.falcon.FalconException - * if the filesystem could not be created. - */ - public FileSystem createFalconFileSystem(final Configuration conf) - throws FalconException { - Validate.notNull(conf, "configuration cannot be null"); - - String nameNode = getNameNode(conf); - try { - return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf); - } catch (URISyntaxException e) { - throw new FalconException("Exception while getting FileSystem for: " + nameNode, e); - } catch (IOException e) { - throw new FalconException("Exception while getting FileSystem for: " + nameNode, e); - } - } - - /** - * Return a FileSystem created with the authenticated proxy user for the specified conf. - * - * @param conf Configuration with all necessary information to create the FileSystem. - * @return FileSystem created with the provided proxyUser/group. - * @throws org.apache.falcon.FalconException - * if the filesystem could not be created. - */ - public FileSystem createProxiedFileSystem(final Configuration conf) - throws FalconException { - Validate.notNull(conf, "configuration cannot be null"); - - String nameNode = getNameNode(conf); - try { - return createProxiedFileSystem(new URI(nameNode), conf); - } catch (URISyntaxException e) { - throw new FalconException("Exception while getting FileSystem for: " + nameNode, e); - } - } - - private static String getNameNode(Configuration conf) { - return conf.get(FS_DEFAULT_NAME_KEY); - } - - /** - * This method is called from with in a workflow execution context. - * - * @param uri uri - * @return file system handle - * @throws FalconException - */ - public FileSystem createProxiedFileSystem(final URI uri) throws FalconException { - return createProxiedFileSystem(uri, new Configuration()); - } - - public FileSystem createProxiedFileSystem(final URI uri, - final Configuration conf) throws FalconException { - Validate.notNull(uri, "uri cannot be null"); - - try { - return createFileSystem(CurrentUser.getProxyUGI(), uri, conf); - } catch (IOException e) { - throw new FalconException("Exception while getting FileSystem for proxy: " - + CurrentUser.getUser(), e); - } - } - - /** - * Return a FileSystem created with the provided user for the specified URI. - * - * @param ugi user group information - * @param uri file system URI. - * @param conf Configuration with all necessary information to create the FileSystem. - * @return FileSystem created with the provided user/group. - * @throws org.apache.falcon.FalconException - * if the filesystem could not be created. - */ - @SuppressWarnings("ResultOfMethodCallIgnored") - public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri, - final Configuration conf) throws FalconException { - Validate.notNull(ugi, "ugi cannot be null"); - Validate.notNull(conf, "configuration cannot be null"); - - try { - if (UserGroupInformation.isSecurityEnabled()) { - ugi.checkTGTAndReloginFromKeytab(); - } - } catch (IOException ioe) { - throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user " - + ugi.getShortUserName(), ioe); - } - - validateNameNode(uri, conf); - - try { - // prevent falcon impersonating falcon, no need to use doas - final String proxyUserName = ugi.getShortUserName(); - if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) { - LOG.info("Creating FS for the login user {}, impersonation not required", - proxyUserName); - return FileSystem.get(uri, conf); - } - - LOG.info("Creating FS impersonating user {}", proxyUserName); - return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { - public FileSystem run() throws Exception { - return FileSystem.get(uri, conf); - } - }); - } catch (InterruptedException ex) { - throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex); - } catch (IOException ex) { - throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex); - } - } - - /** - * This method validates if the execute url is able to reach the MR endpoint. - * - * @param executeUrl jt url or RM url - * @throws IOException - */ - public void validateJobClient(String executeUrl) throws IOException { - final JobConf jobConf = new JobConf(); - jobConf.set(MR_JT_ADDRESS_KEY, executeUrl); - jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl); - - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - try { - JobClient jobClient = loginUser.doAs(new PrivilegedExceptionAction<JobClient>() { - public JobClient run() throws Exception { - return new JobClient(jobConf); - } - }); - - jobClient.getClusterStatus().getMapTasks(); - } catch (InterruptedException e) { - throw new IOException("Exception creating job client:" + e.getMessage(), e); - } - } - - public static FsPermission getDirDefaultPermission(Configuration conf) { - return getDirDefault().applyUMask(FsPermission.getUMask(conf)); - } - - public static FsPermission getFileDefaultPermission(Configuration conf) { - return getFileDefault().applyUMask(FsPermission.getUMask(conf)); - } - - public static FsPermission getDirDefault() { - return new FsPermission((short)511); - } - - public static FsPermission getFileDefault() { - return new FsPermission((short)438); - } - - public static void mkdirsWithDefaultPerms(FileSystem fs, Path path) throws IOException { - mkdirs(fs, path, getDirDefaultPermission(fs.getConf())); - } - - public static void mkdirs(FileSystem fs, Path path, - FsPermission permission) throws IOException { - if (!FileSystem.mkdirs(fs, path, permission)) { - throw new IOException("mkdir failed for " + path); - } - } - - private void validateNameNode(URI uri, Configuration conf) throws FalconException { - String nameNode = uri.getAuthority(); - if (nameNode == null) { - nameNode = getNameNode(conf); - if (nameNode != null) { - try { - new URI(nameNode).getAuthority(); - } catch (URISyntaxException ex) { - throw new FalconException("Exception while getting FileSystem", ex); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java deleted file mode 100644 index 5bcc2f8..0000000 --- a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.lifecycle; - -import org.apache.falcon.FalconException; - -/** - * Abstract factory class for feed lifecycle policy builders. - */ -public abstract class AbstractPolicyBuilderFactory { - - public abstract PolicyBuilder getPolicyBuilder(String policyName) throws FalconException; - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java deleted file mode 100644 index 833ad04..0000000 --- a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.lifecycle; - -/** - * Enum for valid lifecycle stages for the feed. - */ -public enum FeedLifecycleStage { - - RETENTION("AgeBasedDelete"); - - private String defaultPolicyName; - - private FeedLifecycleStage(String defaultPolicyName) { - this.defaultPolicyName = defaultPolicyName; - } - - public String getDefaultPolicyName() { - return defaultPolicyName; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java deleted file mode 100644 index be4e68c..0000000 --- a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.lifecycle; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.hadoop.fs.Path; - -import java.util.Properties; - -/** - * Interface for all policies in feed lifecycle. - */ -public interface LifecyclePolicy { - - /** - * Returns the name of the policy. Name of policy must be unique as it is used as an identifier. - * @return name of the policy - */ - String getName(); - - /** - * Returns the stage to which the policy belongs. - * @return stage to which the policy belongs. - */ - FeedLifecycleStage getStage(); - - /** - * Validates the configurations as per this policy. - * @param feed Parent feed for which the policy is configured. - * @param clusterName cluster to be used as context for validation. - * @throws FalconException - */ - void validate(Feed feed, String clusterName) throws FalconException; - - /** - * Builds workflow engine artifacts. - * @param cluster cluster to be used as context - * @param buildPath base path to be used for storing the artifacts. - * @param feed Parent feed. - * @return Properties to be passed to the caller e.g. bundle in case of oozie workflow engine. - * @throws FalconException - */ - Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException; - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java deleted file mode 100644 index 5e5055b..0000000 --- a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.lifecycle; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.hadoop.fs.Path; - -import java.util.Properties; - -/** - * Interface to be implemented by all policy builders for a lifecycle policy. - * A Builder builds workflow engine specific artifacts for a policy. - */ -public interface PolicyBuilder { - - Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException; - - String getPolicyName(); -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java deleted file mode 100644 index 8d735f9..0000000 --- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.lifecycle.retention; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.parser.ValidationException; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Property; -import org.apache.falcon.entity.v0.feed.RetentionStage; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.util.StartupProperties; - -import java.util.Date; - -/** - * Retention policy which deletes all instances of instance time older than a given time. - * It will create the workflow and coordinators for this policy. - */ -public class AgeBasedDelete extends RetentionPolicy { - - public static final String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit"; - - @Override - public void validate(Feed feed, String clusterName) throws FalconException { - // validate that it is a valid cluster - Cluster cluster = FeedHelper.getCluster(feed, clusterName); - Frequency retentionLimit = getRetentionLimit(feed, clusterName); - if (cluster != null) { - validateLimitWithSla(feed, cluster, retentionLimit.toString()); - validateLimitWithLateData(feed, cluster, retentionLimit.toString()); - String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl", - "org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory"); - if ("org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory".equals(lifecycleEngine)) { - validateRetentionFrequencyForOozie(feed, clusterName); - } - } - } - - - private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException { - // retention shouldn't be more frequent than hours(1) for Oozie Builders. - Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName); - if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes - && retentionFrequency.getFrequencyAsInt() < 60) { - throw new ValidationException("Feed Retention can not be more frequent than hours(1)"); - } - } - - private void validateLimitWithLateData(Feed feed, Cluster cluster, String retention) throws FalconException { - ExpressionHelper evaluator = ExpressionHelper.get(); - long retentionPeriod = evaluator.evaluate(retention, Long.class); - - if (feed.getLateArrival() != null) { - String feedCutoff = feed.getLateArrival().getCutOff().toString(); - long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class); - if (retentionPeriod < feedCutOffPeriod) { - throw new ValidationException( - "Feed's retention limit: " + retention + " of referenced cluster " + cluster.getName() - + " should be more than feed's late arrival cut-off period: " + feedCutoff - + " for feed: " + feed.getName()); - } - } - } - - private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException { - // test that slaHigh is less than retention - Sla clusterSla = FeedHelper.getSLA(cluster, feed); - if (clusterSla != null) { - ExpressionHelper evaluator = ExpressionHelper.get(); - ExpressionHelper.setReferenceDate(new Date()); - - Frequency slaHighExpression = clusterSla.getSlaHigh(); - Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class)); - - Date retention = new Date(evaluator.evaluate(retentionExpression, Long.class)); - if (slaHigh.after(retention)) { - throw new ValidationException("slaHigh of Feed: " + slaHighExpression - + " is greater than retention of the feed: " + retentionExpression - + " for cluster: " + cluster.getName() - ); - } - } - } - - public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconException { - RetentionStage retention = FeedHelper.getRetentionStage(feed, clusterName); - if (retention != null) { - String limit = null; - for (Property property : retention.getProperties().getProperties()) { - if (StringUtils.equals(property.getName(), LIMIT_PROPERTY_NAME)) { - limit = property.getValue(); - } - } - if (limit == null) { - throw new FalconException("Property: " + LIMIT_PROPERTY_NAME + " is required for " - + getName() + " policy."); - } - try { - return new Frequency(limit); - } catch (IllegalArgumentException e) { - throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid " - + "frequency e.g. hours(2)", e); - } - } else { - throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage"); - } - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java deleted file mode 100644 index 7fd6175..0000000 --- a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.lifecycle.retention; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory; -import org.apache.falcon.lifecycle.FeedLifecycleStage; -import org.apache.falcon.lifecycle.LifecyclePolicy; -import org.apache.falcon.lifecycle.PolicyBuilder; -import org.apache.falcon.workflow.WorkflowEngineFactory; -import org.apache.hadoop.fs.Path; - -import java.util.Properties; - -/** - * All retention policies must implement this interface. - */ -public abstract class RetentionPolicy implements LifecyclePolicy { - - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - - @Override - public FeedLifecycleStage getStage() { - return FeedLifecycleStage.RETENTION; - } - - @Override - public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException { - AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine(); - PolicyBuilder builder = factory.getPolicyBuilder(getName()); - return builder.build(cluster, buildPath, feed); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java deleted file mode 100644 index 25bbf0c..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java +++ /dev/null @@ -1,514 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.Vertex; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.ProcessHelper; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.datasource.Datasource; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.entity.v0.process.Workflow; -import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * Entity Metadata relationship mapping helper. - */ -public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(EntityRelationshipGraphBuilder.class); - - - public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) { - super(graph, preserveHistory); - } - - public void addEntity(Entity entity) { - EntityType entityType = entity.getEntityType(); - switch (entityType) { - case CLUSTER: - addClusterEntity((Cluster) entity); - break; - case PROCESS: - addProcessEntity((Process) entity); - break; - case FEED: - addFeedEntity((Feed) entity); - break; - case DATASOURCE: - addDatasourceEntity((Datasource) entity); - break; - - default: - throw new IllegalArgumentException("Invalid EntityType " + entityType); - } - } - - public void addClusterEntity(Cluster clusterEntity) { - LOG.info("Adding cluster entity: {}", clusterEntity.getName()); - Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY); - - addUserRelation(clusterVertex); - addColoRelation(clusterEntity.getColo(), clusterVertex); - addDataClassification(clusterEntity.getTags(), clusterVertex); - } - - public void addFeedEntity(Feed feed) { - LOG.info("Adding feed entity: {}", feed.getName()); - Vertex feedVertex = addVertex(feed.getName(), RelationshipType.FEED_ENTITY); - - addUserRelation(feedVertex); - addDataClassification(feed.getTags(), feedVertex); - addGroups(feed.getGroups(), feedVertex); - - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - if (ClusterType.TARGET != feedCluster.getType()) { - addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE); - } - } - - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - if (FeedHelper.isImportEnabled(feedCluster)) { - addRelationToDatasource(feedVertex, FeedHelper.getImportDatasourceName(feedCluster), - RelationshipLabel.DATASOURCE_IMPORT_EDGE); - } - } - } - - public void addDatasourceEntity(Datasource dsEntity) { - LOG.info("Adding datasource entity: {}", dsEntity.getName()); - Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY); - - addUserRelation(dsVertex); - addColoRelation(dsEntity.getColo(), dsVertex); - addDataClassification(dsEntity.getTags(), dsVertex); - } - - - public void updateEntity(Entity oldEntity, Entity newEntity) { - EntityType entityType = oldEntity.getEntityType(); - switch (entityType) { - case CLUSTER: - // a cluster cannot be updated - break; - case PROCESS: - updateProcessEntity((Process) oldEntity, (Process) newEntity); - break; - case FEED: - updateFeedEntity((Feed) oldEntity, (Feed) newEntity); - break; - default: - throw new IllegalArgumentException("Invalid EntityType " + entityType); - } - } - - - - public void updateFeedEntity(Feed oldFeed, Feed newFeed) { - LOG.info("Updating feed entity: {}", newFeed.getName()); - Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY); - if (feedEntityVertex == null) { - LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName()); - throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist."); - } - - updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex); - updateGroups(oldFeed.getGroups(), newFeed.getGroups(), feedEntityVertex); - updateFeedClusters(oldFeed.getClusters().getClusters(), - newFeed.getClusters().getClusters(), feedEntityVertex); - } - - public void addProcessEntity(Process process) { - String processName = process.getName(); - LOG.info("Adding process entity: {}", processName); - Vertex processVertex = addVertex(processName, RelationshipType.PROCESS_ENTITY); - addWorkflowProperties(process.getWorkflow(), processVertex, processName); - - addUserRelation(processVertex); - addDataClassification(process.getTags(), processVertex); - addPipelines(process.getPipelines(), processVertex); - - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE); - } - - addInputFeeds(process.getInputs(), processVertex); - addOutputFeeds(process.getOutputs(), processVertex); - } - - public void updateProcessEntity(Process oldProcess, Process newProcess) { - LOG.info("Updating process entity: {}", newProcess.getName()); - Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY); - if (processEntityVertex == null) { - LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName()); - throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist"); - } - - updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(), - processEntityVertex, newProcess.getName()); - updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex); - updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex); - updateProcessClusters(oldProcess.getClusters().getClusters(), - newProcess.getClusters().getClusters(), processEntityVertex); - updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex); - updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex); - } - - public void addColoRelation(String colo, Vertex fromVertex) { - Vertex coloVertex = addVertex(colo, RelationshipType.COLO); - addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName()); - } - - public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) { - Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY); - if (clusterVertex == null) { // cluster must exist before adding other entities - LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName); - throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName); - } - - addEdge(fromVertex, clusterVertex, edgeLabel.getName()); - } - - public void addRelationToDatasource(Vertex fromVertex, String datasourceName, RelationshipLabel edgeLabel) { - Vertex clusterVertex = findVertex(datasourceName, RelationshipType.DATASOURCE_ENTITY); - if (clusterVertex == null) { // cluster must exist before adding other entities - LOG.error("Illegal State: Datasource entity vertex must exist for {}", datasourceName); - throw new IllegalStateException("Datasource entity vertex must exist: " + datasourceName); - } - - addEdge(fromVertex, clusterVertex, edgeLabel.getName()); - } - - public void addInputFeeds(Inputs inputs, Vertex processVertex) { - if (inputs == null) { - return; - } - - for (Input input : inputs.getInputs()) { - addProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE); - } - } - - public void addOutputFeeds(Outputs outputs, Vertex processVertex) { - if (outputs == null) { - return; - } - - for (Output output : outputs.getOutputs()) { - addProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE); - } - } - - public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { - Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); - if (feedVertex == null) { - LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); - throw new IllegalStateException("Feed entity vertex must exist: " + feedName); - } - - addProcessFeedEdge(processVertex, feedVertex, edgeLabel); - } - - public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) { - processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), - ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); - processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion()); - processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), - workflow.getEngine().value()); - } - - public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow, - Vertex processEntityVertex, String processName) { - if (areSame(oldWorkflow, newWorkflow)) { - return; - } - - LOG.info("Updating workflow properties for: {}", processEntityVertex); - addWorkflowProperties(newWorkflow, processEntityVertex, processName); - } - - public void updateDataClassification(String oldClassification, String newClassification, - Vertex entityVertex) { - if (areSame(oldClassification, newClassification)) { - return; - } - - removeDataClassification(oldClassification, entityVertex); - addDataClassification(newClassification, entityVertex); - } - - private void removeDataClassification(String classification, Vertex entityVertex) { - if (classification == null || classification.length() == 0) { - return; - } - - String[] oldTags = classification.split(","); - for (String oldTag : oldTags) { - int index = oldTag.indexOf("="); - String tagKey = oldTag.substring(0, index); - String tagValue = oldTag.substring(index + 1, oldTag.length()); - - removeEdge(entityVertex, tagValue, tagKey); - } - } - - public void updateGroups(String oldGroups, String newGroups, Vertex entityVertex) { - if (areSame(oldGroups, newGroups)) { - return; - } - - removeGroups(oldGroups, entityVertex); - addGroups(newGroups, entityVertex); - } - - public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) { - if (areSame(oldPipelines, newPipelines)) { - return; - } - - removePipelines(oldPipelines, entityVertex); - addPipelines(newPipelines, entityVertex); - } - - private void removeGroups(String groups, Vertex entityVertex) { - removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS); - } - - private void removePipelines(String pipelines, Vertex entityVertex) { - removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES); - } - - private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex, - RelationshipLabel edgeLabel) { - if (StringUtils.isEmpty(groupsOrPipelines)) { - return; - } - - String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(","); - for (String groupOrPipelineTag : oldGroupOrPipelinesTags) { - removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName()); - } - } - - public static boolean areSame(String oldValue, String newValue) { - return oldValue == null && newValue == null - || oldValue != null && newValue != null && oldValue.equals(newValue); - } - - public void updateFeedClusters(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.feed.Cluster> newClusters, - Vertex feedEntityVertex) { - if (areFeedClustersSame(oldClusters, newClusters)) { - return; - } - - // remove edges to old clusters - for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) { - if (ClusterType.TARGET != oldCuster.getType()) { - removeEdge(feedEntityVertex, oldCuster.getName(), - RelationshipLabel.FEED_CLUSTER_EDGE.getName()); - } - } - - // add edges to new clusters - for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) { - if (ClusterType.TARGET != newCluster.getType()) { - addRelationToCluster(feedEntityVertex, newCluster.getName(), - RelationshipLabel.FEED_CLUSTER_EDGE); - } - } - } - - public boolean areFeedClustersSame(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.feed.Cluster> newClusters) { - if (oldClusters.size() != newClusters.size()) { - return false; - } - - List<String> oldClusterNames = getFeedClusterNames(oldClusters); - List<String> newClusterNames = getFeedClusterNames(newClusters); - - return oldClusterNames.size() == newClusterNames.size() - && oldClusterNames.containsAll(newClusterNames) - && newClusterNames.containsAll(oldClusterNames); - } - - public List<String> getFeedClusterNames(List<org.apache.falcon.entity.v0.feed.Cluster> clusters) { - List<String> clusterNames = new ArrayList<String>(clusters.size()); - for (org.apache.falcon.entity.v0.feed.Cluster cluster : clusters) { - clusterNames.add(cluster.getName()); - } - - return clusterNames; - } - - public void updateProcessClusters(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.process.Cluster> newClusters, - Vertex processEntityVertex) { - if (areProcessClustersSame(oldClusters, newClusters)) { - return; - } - - // remove old clusters - for (org.apache.falcon.entity.v0.process.Cluster oldCuster : oldClusters) { - removeEdge(processEntityVertex, oldCuster.getName(), - RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()); - } - - // add new clusters - for (org.apache.falcon.entity.v0.process.Cluster newCluster : newClusters) { - addRelationToCluster(processEntityVertex, newCluster.getName(), - RelationshipLabel.PROCESS_CLUSTER_EDGE); - } - } - - public boolean areProcessClustersSame(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.process.Cluster> newClusters) { - if (oldClusters.size() != newClusters.size()) { - return false; - } - - List<String> oldClusterNames = getProcessClusterNames(oldClusters); - List<String> newClusterNames = getProcessClusterNames(newClusters); - - return oldClusterNames.size() == newClusterNames.size() - && oldClusterNames.containsAll(newClusterNames) - && newClusterNames.containsAll(oldClusterNames); - } - - public List<String> getProcessClusterNames(List<org.apache.falcon.entity.v0.process.Cluster> clusters) { - List<String> clusterNames = new ArrayList<String>(clusters.size()); - for (org.apache.falcon.entity.v0.process.Cluster cluster : clusters) { - clusterNames.add(cluster.getName()); - } - - return clusterNames; - } - - public static boolean areSame(Workflow oldWorkflow, Workflow newWorkflow) { - return areSame(oldWorkflow.getName(), newWorkflow.getName()) - && areSame(oldWorkflow.getVersion(), newWorkflow.getVersion()) - && areSame(oldWorkflow.getEngine().value(), newWorkflow.getEngine().value()); - } - - private void updateProcessInputs(Inputs oldProcessInputs, Inputs newProcessInputs, - Vertex processEntityVertex) { - if (areSame(oldProcessInputs, newProcessInputs)) { - return; - } - - removeInputFeeds(oldProcessInputs, processEntityVertex); - addInputFeeds(newProcessInputs, processEntityVertex); - } - - public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) { - if (oldProcessInputs == null && newProcessInputs == null) { - return true; - } - - if (oldProcessInputs == null || newProcessInputs == null - || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) { - return false; - } - - List<Input> oldInputs = oldProcessInputs.getInputs(); - List<Input> newInputs = newProcessInputs.getInputs(); - - return oldInputs.size() == newInputs.size() - && oldInputs.containsAll(newInputs) - && newInputs.containsAll(oldInputs); - } - - public void removeInputFeeds(Inputs inputs, Vertex processVertex) { - if (inputs == null) { - return; - } - - for (Input input : inputs.getInputs()) { - removeProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE); - } - } - - public void removeOutputFeeds(Outputs outputs, Vertex processVertex) { - if (outputs == null) { - return; - } - - for (Output output : outputs.getOutputs()) { - removeProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE); - } - } - - public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { - Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); - if (feedVertex == null) { - LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); - throw new IllegalStateException("Feed entity vertex must exist: " + feedName); - } - - if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) { - removeEdge(feedVertex, processVertex, edgeLabel.getName()); - } else { - removeEdge(processVertex, feedVertex, edgeLabel.getName()); - } - } - - private void updateProcessOutputs(Outputs oldProcessOutputs, Outputs newProcessOutputs, - Vertex processEntityVertex) { - if (areSame(oldProcessOutputs, newProcessOutputs)) { - return; - } - - removeOutputFeeds(oldProcessOutputs, processEntityVertex); - addOutputFeeds(newProcessOutputs, processEntityVertex); - } - - public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) { - if (oldProcessOutputs == null && newProcessOutputs == null) { - return true; - } - - if (oldProcessOutputs == null || newProcessOutputs == null - || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) { - return false; - } - - List<Output> oldOutputs = oldProcessOutputs.getOutputs(); - List<Output> newOutputs = newProcessOutputs.getOutputs(); - - return oldOutputs.size() == newOutputs.size() - && oldOutputs.containsAll(newOutputs) - && newOutputs.containsAll(oldOutputs); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java deleted file mode 100644 index 8bec02f..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.Vertex; -import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Utility class for graph operations. - */ -public final class GraphUtils { - - private static final Logger LOG = LoggerFactory.getLogger(GraphUtils.class); - - private GraphUtils() { - } - - public static void dumpToLog(final Graph graph) { - LOG.debug("Vertices of {}", graph); - for (Vertex vertex : graph.getVertices()) { - LOG.debug(vertexString(vertex)); - } - - LOG.debug("Edges of {}", graph); - for (Edge edge : graph.getEdges()) { - LOG.debug(edgeString(edge)); - } - } - - public static void dump(final Graph graph) throws IOException { - dump(graph, System.out); - } - - public static void dump(final Graph graph, OutputStream outputStream) throws IOException { - GraphSONWriter.outputGraph(graph, outputStream); - } - - public static void dump(final Graph graph, String fileName) throws IOException { - GraphSONWriter.outputGraph(graph, fileName); - } - - public static String vertexString(final Vertex vertex) { - StringBuilder properties = new StringBuilder(); - for (String propertyKey : vertex.getPropertyKeys()) { - properties.append(propertyKey) - .append("=").append(vertex.getProperty(propertyKey)) - .append(", "); - } - - return "v[" + vertex.getId() + "], Properties[" + properties + "]"; - } - - public static String edgeString(final Edge edge) { - return "e[" + edge.getLabel() + "], [" - + edge.getVertex(Direction.OUT).getProperty("name") - + " -> " + edge.getLabel() + " -> " - + edge.getVertex(Direction.IN).getProperty("name") - + "]"; - } -}
