Repository: incubator-falcon Updated Branches: refs/heads/master 387604d18 -> 97b5989a1
FALCON-805 Create store to store feed properties like name against it's path. Contributed by Ajay Yadav Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/97b5989a Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/97b5989a Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/97b5989a Branch: refs/heads/master Commit: 97b5989a18829b42df209d0dc1505438dee95c31 Parents: 387604d Author: srikanth.sundarrajan <srik...@apache.org> Authored: Mon Dec 1 08:25:19 2014 +0530 Committer: srikanth.sundarrajan <srik...@apache.org> Committed: Mon Dec 1 08:25:19 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../falcon/entity/store/FeedLocationStore.java | 193 +++++++++ .../falcon/entity/store/FeedPathStore.java | 41 ++ .../java/org/apache/falcon/util/RadixNode.java | 127 ++++++ .../java/org/apache/falcon/util/RadixTree.java | 421 +++++++++++++++++++ .../entity/store/FeedLocationStoreTest.java | 220 ++++++++++ .../org/apache/falcon/util/RadixNodeTest.java | 93 ++++ .../org/apache/falcon/util/RadixTreeTest.java | 282 +++++++++++++ 8 files changed, 1380 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3df2b2a..ed2ff32 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,9 @@ Trunk (Unreleased) FALCON-807 Fix order of actual and expected expression in assert statements in webapp module (Ajay Yadav via Srikanth Sundarrajan) + FALCON-805 Create store to store feed properties like name against + it's path (Ajay Yadav via Srikanth Sundarrajan) + OPTIMIZATIONS FALCON-913 Change the default values of log clean up services (Suhas vasu) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java new file mode 100644 index 0000000..e056d96 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity.store; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.service.ConfigurationChangeListener; +import org.apache.falcon.util.RadixTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A <Key, Value> Store to store FeedProperties against Feed's Locations. + * + * For example: + * let's say a feed - <b>MyFeed</b>, is configured for two clusters - cluster1 and cluster2 and has data location as + * below. + * /projects/myprocess/data/${MONTH}-${DAY}-${HOUR} + * /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR} + * + * then the key,value store will be like below + * key1: /projects/myprocess/data/${MONTH}-${DAY}-${HOUR} + * value1: [FeedProperties("cluster1", LocationType.DATA, "MyFeed"), + * FeedProperties("cluster2", LocationType.DATA, "MyFeed") + * ] + * + * key2: /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR} + * value2: [FeedProperties("cluster1", LocationType.META, "MyFeed"), + * FeedProperties("cluster2", LocationType.META, "MyFeed") + * ] + * + * It ensures that no two Feeds share the same location. + * It can also be used for operations like: + * <ul> + * <li>Find if a there is a feed which uses a given path as it's location.</li> + * <li>Find name of the feed, given it's location.</li> + * </ul> + */ +public final class FeedLocationStore implements ConfigurationChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(FeedLocationStore.class); + protected final FeedPathStore<FeedProperties> store = new RadixTree<FeedProperties>(); + + private static FeedLocationStore instance = new FeedLocationStore(); + + private FeedLocationStore(){ + } + + public static FeedLocationStore get(){ + return instance; + } + + /** + * Object stored against each path. + */ + public static class FeedProperties { + private final String feedName; + + private final LocationType locationType; + + private final String clusterName; + + public FeedProperties(String feedName, LocationType locationType, String clusterName){ + this.clusterName = clusterName; + this.locationType = locationType; + this.feedName = feedName; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FeedProperties that = (FeedProperties) o; + if (!StringUtils.equals(clusterName, that.clusterName)) { + return false; + } + if (locationType != that.locationType) { + return false; + } + if (!StringUtils.equals(feedName, that.feedName)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = feedName.hashCode(); + result = 31 * result + (locationType != null ? locationType.hashCode() : 0); + result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0); + return result; + } + + } + + + @Override + public void onAdd(Entity entity) throws FalconException { + if (entity.getEntityType() == EntityType.FEED){ + Feed feed = (Feed) entity; + List<Cluster> clusters = feed.getClusters().getClusters(); + for(Cluster cluster: clusters){ + List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, + cluster.getName()), feed); + for(Location location: clusterSpecificLocations){ + if (location != null && StringUtils.isNotBlank(location.getPath())){ + FeedProperties value = new FeedProperties(feed.getName(), location.getType(), + cluster.getName()); + store.insert(StringUtils.trim(location.getPath()), value); + LOG.debug("Inserted location: {} for feed: {} and cluster: {}", + location.getPath(), feed.getName(), cluster.getName()); + } + } + } + } + } + + /** + * Delete the key(path) from the store if the feed is deleted. + * @param entity entity object + * @throws FalconException + */ + @Override + public void onRemove(Entity entity) throws FalconException { + if (entity.getEntityType() == EntityType.FEED){ + + Feed feed = (Feed) entity; + List<Cluster> clusters = feed.getClusters().getClusters(); + for(Cluster cluster: clusters){ + List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, + cluster.getName()), feed); + for(Location location: clusterSpecificLocations){ + if (location != null && StringUtils.isNotBlank(location.getPath())){ + FeedProperties value = new FeedProperties(feed.getName(), location.getType(), + cluster.getName()); + store.delete(location.getPath(), value); + LOG.debug("Deleted location: {} for feed: {} and cluster: {}", + location.getPath(), feed.getName(), cluster.getName()); + } + } + + } + } + + } + + /** + * Delete the old path and insert the new Path when the feed is updated. + * @param oldEntity old entity object + * @param newEntity updated entity object + * @throws FalconException if the new path already exists in the store. + */ + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + onRemove(oldEntity); + onAdd(newEntity); + } + + @Override + public void onReload(Entity entity) throws FalconException { + onAdd(entity); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java new file mode 100644 index 0000000..97d21c1 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity.store; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collection; + +/** + * A <Key, Value> Store to store values against Feed Locations. + * + * @param <T> + */ +public interface FeedPathStore<T> { + + void insert(@Nullable String key, @Nonnull T value); + + int getSize(); + + @Nullable + Collection<T> find(@Nonnull String key); + + boolean delete(@Nonnull String key, @Nonnull T value); + +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/main/java/org/apache/falcon/util/RadixNode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RadixNode.java b/common/src/main/java/org/apache/falcon/util/RadixNode.java new file mode 100644 index 0000000..564df8e --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/RadixNode.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.apache.commons.lang.StringUtils; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** + * Represents a node in Radix Tree. + * + * Each node contains a part of the key, links to it's children and a collection of values + * stored against the key(if the node is the suffix of a key) + * + */ +public class RadixNode<T> { + + private String key; + + private List<RadixNode<T>> children; + + private boolean isTerminal; + + private Set<T> values; + + public RadixNode(){ + key = ""; + children = new LinkedList<RadixNode<T>>(); + isTerminal = false; + values = new HashSet<T>(); + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public List<RadixNode<T>> getChildren() { + return children; + } + + public void setChildren(List<RadixNode<T>> children) { + this.children = children; + } + + public boolean isTerminal() { + return isTerminal; + } + + public void setTerminal(boolean isTerminalNew) { + this.isTerminal = isTerminalNew; + } + + /** + * Root node is the node with a token string(empty String in our case) + * as key. + * + * @return True if the node is root Node, False otherwise + */ + public boolean isRoot(){ + return StringUtils.equals(key, ""); + } + + public Collection<T> getValues() { + return Collections.unmodifiableCollection(values); + } + + public void setValues(Collection<T> newValues) { + values = new HashSet<T>(); + values.addAll(newValues); + } + + public void addValue(T value){ + values.add(value); + } + + public void removeValue(T value) { + values.remove(value); + } + public void removeAll() { + values.clear(); + } + + public boolean containsValue(T value){ + return values.contains(value); + } + + public int getMatchLength(String input){ + int matchLength = 0; + + if (input == null){ + return 0; + } + + while(matchLength < key.length() + && matchLength < input.length() + && input.charAt(matchLength) == key.charAt(matchLength)){ + matchLength += 1; + } + + return matchLength; + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/main/java/org/apache/falcon/util/RadixTree.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RadixTree.java b/common/src/main/java/org/apache/falcon/util/RadixTree.java new file mode 100644 index 0000000..6dbe160 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/RadixTree.java @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.entity.store.FeedPathStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; +import java.util.Formattable; +import java.util.Formatter; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + + +/** + * A thread-safe Radix Tree implementation of the LocationStore. + * + * + * A radix tree (also patricia trie or radix trie or compact prefix tree) is a space-optimized + * trie data structure where each node with only one child is merged with its parent. + * + * For example the tree representation for the following (key,value) pairs - + * [("key1", "value1"), ("key123", "Key was key123"), ("key124", "Key was key124"), + * ("key2", "value2"), ("random", "random")] will be as below. + * + * | + * |-key + * |--1[[value1]]* + * |---2 + * |----3[[Key was key123]]* + * |----4[[Key was key124]]* + * |--2[[value2]]* + * |-random[[random]]* + * + * For more details on Radix Tree please refer + * <a href="http://en.wikipedia.org/wiki/Radix_tree">Radix Tree</a> + * @param <T> Type of value being stored against the key. + */ +public class RadixTree<T> implements FeedPathStore<T>, Formattable { + private static final Logger LOG = LoggerFactory.getLogger(RadixTree.class); + + protected RadixNode<T> root; + + private int size; + + public RadixTree(){ + root = new RadixNode<T>(); + root.setKey(""); + size = 0; + } + + /** + * Return the number of keys stored in the tree. + * + * Since all keys end in terminal nodes and duplicate keys are not allowed, + * size is equal to the number of terminal nodes in the tree. + * @return number of keys in the tree. + */ + @Override + public synchronized int getSize() { + return size; + } + + /** + * Insert a <key, value> pair in the Radix Tree. + * + * @param key Key to be stored + * @param value Value to be stored against that key + */ + @Override + public synchronized void insert(@Nullable String key, @Nonnull T value){ + if (key != null && !key.trim().isEmpty()){ + LOG.debug("Insert called for key: {} and value: {}", key.trim(), value); + insertKeyRecursive(key.trim(), value, root); + } + } + + private void insertKeyRecursive(String remainingText, T value, RadixNode<T> currentNode){ + + int currentMatchLength = currentNode.getMatchLength(remainingText); + String newRemainingText = remainingText.substring(currentMatchLength, remainingText.length()); + + // if root or current node key is subset of the input key GO DOWN + if (currentNode.isRoot() + || (currentMatchLength == currentNode.getKey().length() + && currentMatchLength < remainingText.length())){ + + // if a path to go down exists then go down that path + boolean foundPath = false; + for(RadixNode<T> child: currentNode.getChildren()){ + if (child.getKey().charAt(0) == newRemainingText.charAt(0)){ + insertKeyRecursive(newRemainingText, value, child); + foundPath = true; + break; + } + } + // else create a new node. + if (!foundPath){ + RadixNode<T> node = new RadixNode<T>(); + node.setKey(newRemainingText); + node.addValue(value); + node.setTerminal(true); + currentNode.getChildren().add(node); + size += 1; + } + }else if (currentMatchLength == remainingText.length() && currentMatchLength < currentNode.getKey().length()){ + // if remainingText is subset of the current node key + RadixNode<T> node = new RadixNode<T>(); + node.setChildren(currentNode.getChildren()); + node.setKey(currentNode.getKey().substring(currentMatchLength)); + node.setValues(currentNode.getValues()); + node.setTerminal(currentNode.isTerminal()); + + currentNode.setChildren(new LinkedList<RadixNode<T>>()); + currentNode.getChildren().add(node); + currentNode.setTerminal(true); + currentNode.setKey(currentNode.getKey().substring(0, currentMatchLength)); + currentNode.removeAll(); + currentNode.addValue(value); + + size += 1; + + }else if (currentMatchLength < remainingText.length() && currentMatchLength < currentNode.getKey().length()){ + + //add new Node and move all current node's children and value to it + RadixNode<T> node = new RadixNode<T>(); + node.setChildren(currentNode.getChildren()); + node.setTerminal(currentNode.isTerminal()); + node.setValues(currentNode.getValues()); + node.setKey(currentNode.getKey().substring(currentMatchLength, currentNode.getKey().length())); + + // add node for the text + RadixNode<T> node2 = new RadixNode<T>(); + node2.setKey(newRemainingText); + node2.setTerminal(true); + node2.addValue(value); + + //update current node to be new root + currentNode.setTerminal(false); + currentNode.setKey(currentNode.getKey().substring(0, currentMatchLength)); + currentNode.setChildren(new LinkedList<RadixNode<T>>()); + currentNode.getChildren().add(node); + currentNode.getChildren().add(node2); + + size += 1; + }else if (currentMatchLength == remainingText.length() && currentMatchLength == currentNode.getKey().length()){ + // if current node key and input key both match equally + if (currentNode.isTerminal()){ + currentNode.addValue(value); + }else { + currentNode.setTerminal(true); + currentNode.addValue(value); + } + size += 1; + } + } + + /** + * Find the value for the given key if it exists in the tree, null otherwise. + * + * A key is said to exist in the tree if we can generate exactly that string + * by going down from root to a terminal node. If a key exists we return the value + * stored at the terminal node. + * + * @param key - input key to be searched. + * @return T Value of the key if it exists, null otherwise + */ + @Override + @Nullable + public synchronized Collection<T> find(@Nonnull String key) { + if (key != null && !key.trim().isEmpty()){ + return recursiveFind(key.trim(), root); + } + return null; + } + + private Collection<T> recursiveFind(String key, RadixNode<T> currentNode){ + + if (!key.startsWith(currentNode.getKey())){ + LOG.debug("Current Node key: {} is not a prefix in the input key: {}", currentNode.getKey(), key); + return null; + } + + if (StringUtils.equals(key, currentNode.getKey())){ + if (currentNode.isTerminal()){ + LOG.debug("Found the terminal node with key: {} for the given input.", currentNode.getKey()); + return currentNode.getValues(); + }else { + LOG.debug("currentNode is not terminal. Current node's key is {}", currentNode.getKey()); + return null; + } + } + + //find child to follow, using remaining Text + RadixNode<T> newRoot = null; + String remainingText = key.substring(currentNode.getKey().length()); + for(RadixNode<T> child : currentNode.getChildren()){ + if (child.getKey().charAt(0) == remainingText.charAt(0)){ + newRoot = child; + break; + } + } + + if (newRoot == null){ + LOG.debug("No child found to follow for further processing. Current node key {}"); + return null; + }else { + LOG.debug("Recursing with new key: {} and new remainingText: {}", newRoot.getKey(), remainingText); + return recursiveFind(remainingText, newRoot); + } + } + + /** + * Deletes a given key,value pair from the Radix Tree. + * + * @param key key to be deleted + * @param value value to be deleted + */ + @Override + public synchronized boolean delete(@Nonnull String key, @Nonnull T value) { + if (key != null && !key.trim().isEmpty()){ + LOG.debug("Delete called for key:{}", key.trim()); + return recursiveDelete(key, null, root, value); + } + return false; + } + + private boolean recursiveDelete(String key, RadixNode<T> parent, RadixNode<T> currentNode, T value){ + LOG.debug("Recursing with key: {}, currentNode: {}", key, currentNode.getKey()); + if (!key.startsWith(currentNode.getKey())){ + LOG.debug("Current node's key: {} is not a prefix of the remaining input key: {}", + currentNode.getKey(), key); + return false; + } + + if (StringUtils.equals(key, currentNode.getKey())){ + if (currentNode.getValues().contains(value)){ + LOG.debug("Given value is found in the collection of values against the given key"); + currentNode.removeValue(value); + size -= 1; + if (currentNode.getValues().size() == 0){ + LOG.debug("Exact match between current node's key: {} and remaining input key: {}", + currentNode.getKey(), key); + if (currentNode.isTerminal()){ + //if child has no children & only one value, then delete and compact parent if needed + if (currentNode.getChildren().size() == 0){ + Iterator<RadixNode<T>> it = parent.getChildren().iterator(); + while(it.hasNext()){ + if (StringUtils.equals(it.next().getKey(), currentNode.getKey())){ + it.remove(); + LOG.debug("Deleting the node"); + break; + } + } + }else if (currentNode.getChildren().size() > 1){ + // if child has more than one children just mark non terminal + currentNode.setTerminal(false); + }else if (currentNode.getChildren().size() == 1){ + // if child has only one child then compact node + LOG.debug("compacting node with child as node to be deleted has only 1 child"); + RadixNode<T> child = currentNode.getChildren().get(0); + currentNode.setChildren(child.getChildren()); + currentNode.setTerminal(child.isTerminal()); + currentNode.setKey(currentNode.getKey() + child.getKey()); + currentNode.setValues(child.getValues()); + } + + //parent can't be null as root will never match with input key as it is not a terminal node. + if (!parent.isTerminal() && !parent.isRoot()){ + // if only one child left in parent and parent is not root then join parent + // and the only child key + if (parent.getChildren().size() == 1){ + RadixNode<T> onlyChild = parent.getChildren().get(0); + String onlyChildKey = onlyChild.getKey(); + LOG.debug("Compacting child: {} and parent: {}", onlyChildKey, parent.getKey()); + parent.setKey(parent.getKey() + onlyChildKey); + parent.setChildren(onlyChild.getChildren()); + parent.setTerminal(onlyChild.isTerminal()); + parent.setValues(onlyChild.getValues()); + } + } + return true; + }else{ + LOG.debug("Key found only as a prefix and not at a terminal node"); + return false; + } + } + }else { + LOG.debug("Current value is not found in the collection of values against the given key, no-op"); + return false; + } + } + + LOG.debug("Current node's key: {} is a prefix of the input key: {}", currentNode.getKey(), key); + //find child to follow + RadixNode<T> newRoot = null; + String remainingKey = key.substring(currentNode.getMatchLength(key)); + for(RadixNode<T> el : currentNode.getChildren()){ + if (el.getKey().charAt(0) == remainingKey.charAt(0)){ + newRoot = el; + break; + } + } + + if (newRoot == null){ + LOG.debug("No child was found with common prefix with the remainder key: {}", key); + return false; + }else { + LOG.debug("Found a child's key: {} with common prefix, recursing on it", newRoot.getKey()); + return recursiveDelete(remainingKey, currentNode, newRoot, value); + } + } + + + /** + * Useful for debugging. + */ + @Override + public void formatTo(Formatter formatter, int flags, int width, int precision) { + formatNodeTo(formatter, 0, root); + + } + + private void formatNodeTo(Formatter formatter, int level, RadixNode<T> node){ + for (int i = 0; i < level; i++) { + formatter.format(" "); + } + formatter.format("|"); + for (int i = 0; i < level; i++) { + formatter.format("-"); + } + + if (node.isTerminal()){ + formatter.format("%s[%s]*%n", node.getKey(), node.getValues()); + }else{ + formatter.format("%s%n", node.getKey()); + } + + for (RadixNode<T> child : node.getChildren()) { + formatNodeTo(formatter, level + 1, child); + } + } + + /** + * Find List of substring of keys which have given input as a prefix. + * + * @param key - Input string for which all Suffix Children should be returned + * @param limit - Maximum Number of results. If limit is less than 0 then all nodes are returned. + * If limit is 0 then returns null. + */ + @javax.annotation.Nullable + public List<String> findSuffixChildren(String key, int limit){ + if (key == null || limit == 0){ + return null; + } + RadixNode<T> currentNode = root; + String remainingText = key.trim(); + List<String> result = new LinkedList<String>(); + do{ + boolean flag = false; + // find the child with common prefix + for(RadixNode<T> child: currentNode.getChildren()){ + LOG.debug("Checking for child key: {} against remainingText: {}", child.getKey(), remainingText); + if (child.getKey().charAt(0) == remainingText.charAt(0)){ + LOG.debug("Child key: {} found to have overlap with the remainingText: {}", child.getKey(), + remainingText); + flag = true; + + //if entire key doesn't match return null + if (!remainingText.startsWith(child.getKey())){ + return null; + } + + // if entire key equals remainingText - return it's children up to the specified limit + if (StringUtils.equals(child.getKey(), remainingText)){ + int counter = 0; + + for(RadixNode<T> suffixChild: child.getChildren()){ + if (limit < 0 || counter < limit){ + result.add(suffixChild.getKey()); + } + } + return Collections.unmodifiableList(result); + } + + //if entire key matches but it is not equal to entire remainingText - repeat + remainingText = remainingText.substring(child.getKey().length()); + currentNode = child; + break; + + } + } + // if no child found with common prefix return null; + if (!flag){ + return null; + } + }while (true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java new file mode 100644 index 0000000..86ef775 --- /dev/null +++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.entity.store; + +import org.apache.commons.io.FileUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Location; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.Locations; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.StartupProperties; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.net.URI; +import java.util.Collection; + + +/** + * Tests for FeedLocationStore. + */ +public class FeedLocationStoreTest { + private ConfigurationStore store; + + + @BeforeClass + public void initConfigStore() throws Exception { + String configPath = new URI(StartupProperties.get().getProperty("config.store.uri")).getPath(); + String location = configPath + "-" + getClass().getName(); + StartupProperties.get().setProperty("config.store.uri", location); + FileUtils.deleteDirectory(new File(location)); + + cleanupStore(); + StartupProperties.get().setProperty("configstore.listeners", + "org.apache.falcon.entity.store.FeedLocationStore"); + store = ConfigurationStore.get(); + store.init(); + + CurrentUser.authenticate("testuser"); + + } + @BeforeMethod + public void setUp() throws FalconException{ + cleanupStore(); + } + + @AfterMethod + public void print() { + System.out.printf("%s", FeedLocationStore.get().store); + } + + @Test + public void testOnAddSameLocation() throws FalconException{ + Feed f1 = createFeed("f1SameLocations"); + int initialSize = FeedLocationStore.get().store.getSize(); + f1.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/projects/cas/data/hourly/2014/09/09/09")); + f1.getLocations().getLocations().add(createLocation(LocationType.STATS, + "/projects/cas/stats/hourly/2014/09/09/09")); + + Feed f2 = createFeed("f2SameLocations"); + f2.getLocations().getLocations().add(createLocation(LocationType.STATS, + "/projects/cas/data/hourly/2014/09/09/09")); + f2.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/projects/cas/stats/hourly/2014/09/09/09")); + + store.publish(EntityType.FEED, f1); + store.publish(EntityType.FEED, f2); + int finalSize = FeedLocationStore.get().store.getSize(); + Assert.assertEquals(finalSize - initialSize, 8); + } + + @Test + public void testOnRemove() throws FalconException{ + int initialSize = FeedLocationStore.get().store.getSize(); + + Feed f1 = createFeed("f1ForRemove"); + f1.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/projects/cas/data/hourly/2014/09/09/09")); + f1.getLocations().getLocations().add(createLocation(LocationType.STATS, + "/projects/cas/data/hourly/2014/09/09/09")); + + store.publish(EntityType.FEED, f1); + Assert.assertEquals(FeedLocationStore.get().store.getSize() - initialSize, 4); + store.remove(EntityType.FEED, "f1ForRemove"); + Assert.assertEquals(FeedLocationStore.get().store.getSize(), initialSize); + + } + + + @Test + public void testOnChange() throws FalconException{ + Feed f1 = createFeed("f1"); + f1.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/projects/cas/data/hourly/2014/09/09/09")); + store.publish(EntityType.FEED, f1); + + Feed f2 = createFeed("f1"); + f2.getLocations().getLocations().add(createLocation(LocationType.DATA, + "/projects/cas/data/monthly")); + store.initiateUpdate(f2); + store.update(EntityType.FEED, f2); + store.cleanupUpdateInit(); + + Feed f3 = createFeed("f2"); + f3.getLocations().getLocations().add(createLocation(LocationType.STATS, + "/projects/cas/data/hourly/2014/09/09/09")); + store.publish(EntityType.FEED, f3); + + } + + @Test + public void testWithClusterLocations() throws FalconException { + Feed f = createFeedWithClusterLocations("clusterFeed"); + int initialSize = FeedLocationStore.get().store.getSize(); + store.publish(EntityType.FEED, f); + Assert.assertEquals(FeedLocationStore.get().store.getSize() - initialSize, 6); + store.remove(EntityType.FEED, "clusterFeed"); + Assert.assertEquals(FeedLocationStore.get().store.getSize(), initialSize); + } + + + private Feed createFeed(String name){ + Feed f = new Feed(); + Locations locations = new Locations(); + f.setLocations(locations); + f.setName(name); + f.setClusters(createBlankClusters()); + return f; + } + + + private Feed createFeedWithClusterLocations(String name) { + Feed f = new Feed(); + f.setLocations(new Locations()); + f.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/data")); + f.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/stats")); + f.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/meta")); + f.setName(name); + f.setClusters(createClustersWithLocations()); + return f; + } + + private Location createLocation(LocationType type, String path){ + Location location = new Location(); + location.setPath(path); + location.setType(type); + return location; + } + + private void cleanupStore() throws FalconException { + store = ConfigurationStore.get(); + for (EntityType type : EntityType.values()) { + Collection<String> entities = store.getEntities(type); + for (String entity : entities) { + store.remove(type, entity); + } + } + } + + private Clusters createClustersWithLocations() { + Clusters clusters = new Clusters(); + Cluster cluster1 = new Cluster(); + cluster1.setName("cluster1WithLocations"); + cluster1.setLocations(new Locations()); + cluster1.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/cluster1/data")); + cluster1.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/cluster1/stats")); + cluster1.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/cluster1/meta")); + + Cluster cluster2 = new Cluster(); + cluster2.setName("cluster2WithLocations"); + cluster2.setLocations(new Locations()); + cluster2.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/cluster2/data")); + cluster2.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/cluster2/stats")); + cluster2.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/cluster2/meta")); + + clusters.getClusters().add(cluster1); + clusters.getClusters().add(cluster2); + + return clusters; + } + + private Clusters createBlankClusters() { + Clusters clusters = new Clusters(); + + Cluster cluster = new Cluster(); + cluster.setName("blankCluster1"); + clusters.getClusters().add(cluster); + + Cluster cluster2 = new Cluster(); + cluster2.setName("blankCluster2"); + clusters.getClusters().add(cluster2); + + return clusters; + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java b/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java new file mode 100644 index 0000000..4f63806 --- /dev/null +++ b/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.testng.Assert; +import org.testng.annotations.*; + +import java.util.Arrays; +import java.util.HashSet; + +/** + * Tests for Radix Node. + */ +public class RadixNodeTest { + private RadixNode<String> rootNode = new RadixNode<String>(); + private RadixNode<String> normalNode = new RadixNode<String>(); + + @BeforeMethod + public void setUp(){ + rootNode.setKey(""); + rootNode.setValues(new HashSet<String>(Arrays.asList("root"))); + + normalNode.setKey("/data/cas/"); + normalNode.setValues(new HashSet<String>(Arrays.asList("CAS Project"))); + + } + + + @Test + public void testMatchingWithRoot(){ + String inputKey = "/data/cas/"; + Assert.assertEquals(rootNode.getMatchLength(inputKey), 0); + } + + @Test + public void testEmptyMatchingWithRoot(){ + String inputKey = ""; + Assert.assertEquals(rootNode.getMatchLength(inputKey), 0); + } + + @Test + public void testNullMatchingWithRoot(){ + Assert.assertEquals(rootNode.getMatchLength(null), 0); + } + + @Test + public void testDistinctStringMatching(){ + String inputKey = "data/cas"; + Assert.assertEquals(normalNode.getMatchLength(inputKey), 0); + } + + @Test + public void testSameStringMatching(){ + String inputKey = "/data/cas"; + Assert.assertEquals(normalNode.getMatchLength(inputKey), 9); + } + + @Test + public void testNullStringMatching(){ + Assert.assertEquals(normalNode.getMatchLength(null), 0); + } + + + @Test + public void testAddingDuplicateValues() { + rootNode.addValue("root"); + Assert.assertEquals(rootNode.getValues().size(), 1); + } + + @Test + public void testAddMultipleValues() { + normalNode.addValue("data"); + Assert.assertTrue(normalNode.containsValue("data")); + Assert.assertTrue(normalNode.containsValue("CAS Project")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/97b5989a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java new file mode 100644 index 0000000..28589ed --- /dev/null +++ b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.apache.falcon.entity.store.FeedLocationStore; +import org.apache.falcon.entity.store.FeedPathStore; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; + +/** + * Tests for Radix Tree. + */ +public class RadixTreeTest { + + private RadixTree<String> tree; + + @BeforeMethod + public void setUp() { + tree = new RadixTree<String>(); + tree.insert("key1", "value1"); + tree.insert("key2", "value2"); + tree.insert("random", "random"); + + } + + @AfterMethod + public void reset() { + tree = null; + } + + @Test + public void testInsertAtRootTest() { + FeedPathStore<String> tree2 = new RadixTree<String>(); + tree2.insert("/data/cas/projects/dwh/", "dwh"); + Assert.assertEquals(tree2.find("/data/cas/projects/dwh/").size(), 1); + Assert.assertTrue(tree2.find("/data/cas/projects/dwh/").contains("dwh")); + + } + + + @Test + public void testDuplicateKeyInsert() { + tree.insert("duplicatekey", "value1"); + tree.insert("duplicatekey", "value2"); + Assert.assertEquals(tree.find("duplicatekey").size(), 2); + Assert.assertTrue(tree.find("duplicatekey").contains("value1")); + Assert.assertTrue(tree.find("duplicatekey").contains("value2")); + + } + + @Test + public void testNoOverlap() { + tree.insert("water", "No Overlap"); + Assert.assertEquals(tree.getSize(), 4); + } + + @Test + public void testInputKeySubset() { + tree.insert("rand", "Input Subset"); + Assert.assertEquals(tree.getSize(), 4); + + } + + @Test + public void testInputKeySuperset() { + tree.insert("randomiser", "Input Superset"); + Assert.assertEquals(tree.getSize(), 4); + } + + + @Test + public void testInputKeyPathStyle() { + tree.insert("/data/cas/projects/", "path"); + Assert.assertEquals(tree.getSize(), 4); + Assert.assertTrue(tree.find("/data/cas/projects/").contains("path")); + } + + + // Tests for find String + @Test + public void testSubstringPathFind() { + tree.insert("/data/cas/projects/rtbd/", "rtbd"); + tree.insert("/data/cas/projects/dwh/", "dwh"); + Assert.assertEquals(tree.getSize(), 5); + Assert.assertTrue(tree.find("/data/cas/projects/rtbd/").contains("rtbd")); + Assert.assertTrue(tree.find("/data/cas/projects/dwh/").contains("dwh")); + Assert.assertNull(tree.find("/data/cas/projects/")); + } + + @Test + public void testStringSplitFind() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + Assert.assertTrue(tree.find("rand").contains("rand")); + Assert.assertTrue(tree.find("random").contains("random")); + Assert.assertTrue(tree.find("randomizer").contains("randomizer")); + + } + + // Tests for delete method + @Test + public void testDeleteChildOfTerminal() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + Assert.assertTrue(tree.delete("randomizer", "randomizer")); + Assert.assertNull(tree.find("randomizer")); + Assert.assertTrue(tree.find("random").contains("random")); + } + + @Test + public void testMarkingNonTerminal() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + tree.delete("rand", "rand"); + Assert.assertNull(tree.find("rand")); + Assert.assertTrue(tree.find("random").contains("random")); + Assert.assertTrue(tree.find("randomizer").contains("randomizer")); + } + + @Test + public void testDoubleDelete() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + Assert.assertTrue(tree.delete("rand", "rand")); + Assert.assertFalse(tree.delete("rand", "rand")); + Assert.assertNull(tree.find("rand")); + Assert.assertTrue(tree.find("random").contains("random")); + Assert.assertTrue(tree.find("randomizer").contains("randomizer")); + } + + @Test + public void testChildCompactionDelete() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + Assert.assertTrue(tree.delete("random", "random")); + Assert.assertNull(tree.find("random")); + Assert.assertTrue(tree.find("rand").contains("rand")); + Assert.assertTrue(tree.find("randomizer").contains("randomizer")); + Assert.assertEquals(tree.getSize(), 4); + } + + @Test + public void testParentCompactionDelete() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + Assert.assertTrue(tree.delete("randomizer", "randomizer")); + Assert.assertNull(tree.find("randomizer")); + Assert.assertTrue(tree.find("rand").contains("rand")); + Assert.assertTrue(tree.find("random").contains("random")); + Assert.assertEquals(tree.getSize(), 4); + + } + + @Test + public void testSequencesOfDelete() { + tree.insert("rand", "rand"); + tree.insert("randomizer", "randomizer"); + + Assert.assertTrue(tree.delete("randomizer", "randomizer")); + Assert.assertNull(tree.find("randomizer")); + Assert.assertTrue(tree.find("rand").contains("rand")); + Assert.assertTrue(tree.find("random").contains("random")); + Assert.assertEquals(tree.getSize(), 4); + + Assert.assertTrue(tree.delete("rand", "rand")); + Assert.assertNull(tree.find("rand")); + Assert.assertTrue(tree.find("random").contains("random")); + Assert.assertEquals(tree.getSize(), 3); + + Assert.assertTrue(tree.delete("random", "random")); + Assert.assertNull(tree.find("random")); + Assert.assertEquals(tree.getSize(), 2); + + } + + @Test + public void testRootNotCompactedInDelete() { + Assert.assertTrue(tree.delete("random", "random")); + Assert.assertTrue(tree.delete("key2", "value2")); + tree.insert("water", "water"); + Assert.assertTrue(tree.find("water").contains("water")); + + } + + @Test + public void testDeleteNonExistent() { + Assert.assertFalse(tree.delete("zzz", "zzz")); + } + + @Test + public void testDeleteSubstring() { + Assert.assertFalse(tree.delete("ke", "ke")); + } + + @Test + public void testDeleteNonTerminal() { + Assert.assertFalse(tree.delete("key", "key")); + } + + + @Test + public void testDeleteBlankOrEmptyOrNullString(){ + Assert.assertFalse(tree.delete("", "")); + Assert.assertFalse(tree.delete(" ", " ")); + Assert.assertFalse(tree.delete(null, null)); + } + + @Test + public void testAllSuffixForFirstLevelKey() { + tree.insert("key123", "Key was key123"); + tree.insert("key124", "Key was key124"); + List<String> result = tree.findSuffixChildren("key", 2); + Assert.assertEquals(result.size(), 2); + Assert.assertTrue(result.contains("1")); + Assert.assertTrue(result.contains("2")); + } + + @Test + public void testAllSuffixForNestedLevelKey() { + tree.insert("key123", "Key was key123"); + tree.insert("key124", "Key was key124"); + Assert.assertEquals(tree.findSuffixChildren("key1", 2).size(), 1); + Assert.assertEquals(tree.findSuffixChildren("key1", 2).get(0), "2"); + } + + @Test + public void testFeedPropertiesEquals() { + FeedLocationStore.FeedProperties f1 = new FeedLocationStore.FeedProperties("feed", + LocationType.DATA, "cluster"); + FeedLocationStore.FeedProperties f1Copy = new FeedLocationStore.FeedProperties("feed", + LocationType.DATA, "cluster"); + FeedLocationStore.FeedProperties f3 = new FeedLocationStore.FeedProperties("anotherFeed", + LocationType.DATA, "cluster"); + FeedLocationStore.FeedProperties f4 = new FeedLocationStore.FeedProperties("feed", + LocationType.STATS, "cluster"); + FeedLocationStore.FeedProperties f5 = new FeedLocationStore.FeedProperties("feed", + LocationType.DATA, "anotherCluster"); + + Assert.assertTrue(f1.equals(f1Copy)); + Assert.assertFalse(f1.equals(f3)); + Assert.assertFalse(f1.equals(f4)); + Assert.assertFalse(f1.equals(f5)); + + } + + @Test + public void testMultipleValues(){ + tree.insert("keyWithMultipleValues", "value1"); + tree.insert("keyWithMultipleValues", "value2"); + Assert.assertEquals(tree.find("keyWithMultipleValues").size(), 2); + Assert.assertTrue(tree.find("keyWithMultipleValues").contains("value1")); + Assert.assertTrue(tree.find("keyWithMultipleValues").contains("value2")); + + tree.delete("keyWithMultipleValues", "value1"); + Assert.assertTrue(tree.find("keyWithMultipleValues").contains("value2")); + Assert.assertFalse(tree.find("keyWithMultipleValues").contains("value1")); + + tree.delete("keyWithMultipleValues", "value2"); + Assert.assertNull(tree.find("keyWithMultipleValues")); + } +}