http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java deleted file mode 100644 index 35f03b9..0000000 --- a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.lang.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.entity.common.FeedDataPath; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Falcon specific utilities for the Radix Tree. - */ -public class FalconRadixUtils { - - /** - * This interface implements the various algorithms to compare node's key with input based on whether you want - * a regular expression based algorithm or a character by character matching algorithm. - */ - public interface INodeAlgorithm { - - /** - * Checks if the given key and input match. - * @param key key of the node - * @param input input String to be matched against key. - * @return true if key and input match. - */ - boolean match(String key, String input); - - boolean startsWith(String key, String input); - - /** - * Finds next node to take for traversal among currentNode's children. - * @param currentNode of RadixTree which has been matched. - * @param input input String to be searched. - * @return Node to be traversed next. - */ - RadixNode getNextCandidate(RadixNode currentNode, String input); - - // for the given node and input key, finds the remainingText to be matched with child sub tree. - String getRemainingText(RadixNode currentNode, String key); - } - - /** - * This Algorithm does a plain string comparison for all - * type of operations on a node. - */ - static class StringAlgorithm implements INodeAlgorithm { - - @Override - public boolean match(String key, String input) { - return StringUtils.equals(key, input); - } - - @Override - public boolean startsWith(String nodeKey, String inputKey) { - return inputKey.startsWith(nodeKey); - } - - @Override - public RadixNode getNextCandidate(RadixNode currentNode, String input) { - RadixNode newRoot = null; - String remainingText = input.substring(currentNode.getKey().length()); - List<RadixNode> result = currentNode.getChildren(); - for(RadixNode child : result){ - if (child.getKey().charAt(0) == remainingText.charAt(0)){ - newRoot = child; - break; - } - } - return newRoot; - } - - @Override - public String getRemainingText(RadixNode currentNode, String key) { - return key.substring(currentNode.getKey().length()); - } - - - } - - /** - * Regular Expression Algorithm for the radix tree. - * - * It traverses the radix tree and matches expressions like ${YEAR} etc. with their allowable values e.g. 2014 - */ - public static class FeedRegexAlgorithm implements INodeAlgorithm { - - /** - * This function matches a feed path template with feed instance's path string. - * - * Key is assumed to be a feed's path template and inputString is assumed to be an instance's path string. - * Variable/Regex parts of the feed's template are matched against the corresponding parts in inputString - * using regular expression and for other parts a character by character match is performed. - * e.g. Given templateString (/data/cas/${YEAR}/${MONTH}/${DAY}) and inputString (/data/cas/2014/09/09) - * the function will return true. - * @param templateString Node's key (Feed's template path) - * @param inputString inputString String to be matched against templateString(instance's path) - * @return true if the templateString and inputString match, false otherwise. - */ - @Override - public boolean match(String templateString, String inputString) { - if (StringUtils.isBlank(templateString)) { - return false; - } - // Divide the templateString and inputString into templateParts of regex and character matches - List<String> templateParts = getPartsInPathTemplate(templateString); - List<String> inputStringParts = getCorrespondingParts(inputString, templateParts); - - if (inputStringParts.size() != templateParts.size()) { - return false; - } - - int counter = 0; - while (counter < inputStringParts.size()) { - if (!matchPart(templateParts.get(counter), inputStringParts.get(counter))) { - return false; - } - counter++; - } - return true; - } - - - /** - * - * Finds if the current node's key is a prefix of the given inputString or not. - * - * @param inputTemplate inputTemplate String - * @param inputString inputString to be checked - * @return true if inputString starts with inputTemplate, false otherwise. - */ - @Override - public boolean startsWith(String inputTemplate, String inputString) { - - if (StringUtils.isBlank(inputString)) { - return false; - } - if (StringUtils.isBlank(inputTemplate)) { - return true; - } - - // divide inputTemplate and inputString into corresponding templateParts of regex and character only strings - List<String> templateParts = getPartsInPathTemplate(inputTemplate); - List<String> remainingPattern = getCorrespondingParts(inputString, templateParts); - - if (templateParts.size() > remainingPattern.size()) { - return false; - } - - int counter = 0; - // compare part by part till the templateParts end - for (String templatePart : templateParts) { - String part = remainingPattern.get(counter); - if (!matchPart(templatePart, part)) { - return false; - } - counter++; - } - return true; - } - - @Override - public RadixNode getNextCandidate(RadixNode currentNode, String input) { - RadixNode newRoot = null; - // replace the regex with pattern's length - String remainingText = input.substring(getPatternsEffectiveLength(currentNode.getKey())); - List<RadixNode> result = currentNode.getChildren(); - for(RadixNode child : result) { - String key = child.getKey(); - if (key.startsWith("${")) { - // get the regex - String regex = key.substring(0, key.indexOf("}") + 1); - // match the text and the regex - FeedDataPath.VARS var = getMatchingRegex(regex); - if (matchPart(regex, remainingText.substring(0, var.getValueSize()))) { - newRoot = child; // if it matches then this is the newRoot - break; - } - } else if (child.getKey().charAt(0) == remainingText.charAt(0)) { - newRoot = child; - break; - } - } - return newRoot; - } - - @Override - public String getRemainingText(RadixNode currentNode, String inputString) { - // find the match length for current inputString - return inputString.substring(getPatternsEffectiveLength(currentNode.getKey())); - } - - private int getPatternsEffectiveLength(String templateString) { - if (StringUtils.isBlank(templateString)) { - return 0; - } - - // Since we are only interested in the length, can replace pattern with a random string - for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) { - templateString = templateString.replace("${" + var.name() + "}", - RandomStringUtils.random(var.getValueSize())); - } - - return templateString.length(); - } - - /** - * Divide a given template string into parts of regex and character strings - * e.g. /data/cas/${YEAR}/${MONTH}/${DAY} will be converted to - * [/data/cas/, ${YEAR}, /, ${MONTH}, /, ${DAY}] - * @param templateString input string representing a feed's path template - * @return list of parts in input templateString which are either completely regex or normal string. - */ - private List<String> getPartsInPathTemplate(String templateString) { - //divide the node's templateString in parts of regular expression and normal string - List<String> parts = new ArrayList<String>(); - Matcher matcher = FeedDataPath.PATTERN.matcher(templateString); - int currentIndex = 0; - while (matcher.find()) { - parts.add(templateString.substring(currentIndex, matcher.start())); - parts.add(matcher.group()); - currentIndex = matcher.end(); - } - if (currentIndex != templateString.length()) { - parts.add(templateString.substring(currentIndex)); - } - return Collections.unmodifiableList(parts); - } - - - private FeedDataPath.VARS getMatchingRegex(String inputPart) { - //inputPart will be something like ${YEAR} - for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) { - if (inputPart.equals("${" + var.name() + "}")) { - return var; - } - } - return null; - } - - - /** - * Divides a string into corresponding parts for the template to carry out comparison. - * templateParts = [/data/cas/, ${YEAR}, /, ${MONTH}, /, ${DAY}] - * inputString = /data/cas/2014/09/09 - * returns [/data/cas/, 2014, /, 09, /, 09] - * @param inputString normal string representing feed instance path - * @param templateParts parts of feed's path template broken into regex and non-regex units. - * @return a list of strings where each part of the list corresponds to a part in list of template parts. - */ - private List<String> getCorrespondingParts(String inputString, List<String> templateParts) { - List<String> stringParts = new ArrayList<String>(); - int counter = 0; - while (StringUtils.isNotBlank(inputString) && counter < templateParts.size()) { - String currentTemplatePart = templateParts.get(counter); - int length = Math.min(getPatternsEffectiveLength(currentTemplatePart), inputString.length()); - stringParts.add(inputString.substring(0, length)); - inputString = inputString.substring(length); - counter++; - } - if (StringUtils.isNotBlank(inputString)) { - stringParts.add(inputString); - } - return stringParts; - } - - /** - * Compare a pure regex or pure string part with a given string. - * - * @param template template part, which can either be a pure regex or pure non-regex string. - * @param input input String to be matched against the template part. - * @return true if the input string matches the template, in case of a regex component a regex comparison is - * made, else a character by character comparison is made. - */ - private boolean matchPart(String template, String input) { - if (template.startsWith("${")) { // if the part begins with ${ then it's a regex part, do regex match - template = template.replace("${", "\\$\\{"); - template = template.replace("}", "\\}"); - for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {//find which regex is this - if (StringUtils.equals(var.regex(), template)) {// regex found, do matching - //find part of the input string which should be matched against regex - String desiredPart = input.substring(0, var.getValueSize()); - Pattern pattern = Pattern.compile(var.getValuePattern()); - Matcher matcher = pattern.matcher(desiredPart); - if (!matcher.matches()) { - return false; - } - return true; - } - } - return false; - } else {// do exact match with normal strings - if (!input.startsWith(template)) { - return false; - } - } - return true; - } - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java b/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java deleted file mode 100644 index cc48402..0000000 --- a/common/src/main/java/org/apache/falcon/util/HadoopQueueUtil.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.io.IOUtils; -import org.apache.falcon.FalconException; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; -import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.HashSet; -import java.util.Set; - -/** - * Utility class to get the Hadoop Queue names by querying resource manager. - */ -public final class HadoopQueueUtil { - - private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueUtil.class); - - private HadoopQueueUtil() { - // make the constructor private - } - - /** - * Uses Resource Manager REST API to get the hadoop scheduler info. - * - * @param rmBaseUrlStr - * @return JSON string representing hadoop Scheduler Info - * @throws FalconException - */ - - public static String getHadoopClusterSchedulerInfo(String rmBaseUrlStr) throws FalconException { - KerberosAuthenticator kAUTHENTICATOR = new KerberosAuthenticator(); - AuthenticatedURL.Token authenticationToken = new AuthenticatedURL.Token(); - String rmSchedulerInfoURL = rmBaseUrlStr; - if (!rmSchedulerInfoURL.endsWith("/")) { - rmSchedulerInfoURL += "/"; - } - rmSchedulerInfoURL += "ws/v1/cluster/scheduler"; - HttpURLConnection conn = null; - BufferedReader reader = null; - - try { - URL url = new URL(rmSchedulerInfoURL); - conn = new AuthenticatedURL(kAUTHENTICATOR).openConnection(url, authenticationToken); - reader = new BufferedReader(new InputStreamReader(conn.getInputStream())); - StringBuilder jsonResponse = new StringBuilder(); - String line; - while ((line = reader.readLine()) != null) { - jsonResponse.append(line); - } - return jsonResponse.toString(); - } catch (Exception ex) { - throw new RuntimeException("Could not authenticate, " + ex.getMessage(), ex); - } finally { - IOUtils.closeQuietly(reader); - if (conn != null) { - conn.disconnect(); - } - } - - } - - /** - * - * - * @param jsonResult - * @param qNames - * @return - * @throws JSONException - */ - - public static Set<String> getHadoopClusterQueueNamesHelper(String jsonResult, Set<String> qNames) - throws JSONException { - String qJson = extractRootQueuesElement(jsonResult); - LOG.debug("Extracted Queue JSON - {}", qJson); - JSONObject jObject = new JSONObject(qJson); - LOG.debug("Parsing Json result done"); - JSONObject queues = jObject.getJSONObject("queues"); - jsonParseForHadoopQueueNames(queues, qNames); - return qNames; - } - - /** - * Recursively parses JSON hadoop cluster scheduler info and returns all the sub queue names in the output - * parameter. - * - * @param queues JSON document queues element - * @param qNames Output parameter that will have all hadoop cluster queue names - * @throws JSONException - * - */ - public static void jsonParseForHadoopQueueNames(JSONObject queues, Set<String> qNames) throws JSONException { - JSONArray qs = queues.getJSONArray("queue"); - for(int i=0; i<qs.length(); i++) { - JSONObject q = qs.getJSONObject(i); - qNames.add(q.getString("queueName")); - - if ((q.isNull("type")) - || (!q.getString("type").equalsIgnoreCase("capacitySchedulerLeafQueueInfo"))) { - jsonParseForHadoopQueueNames(q.getJSONObject("queues"), qNames); - } - } - } - /** - * Parse the hadoop cluster scheduler info to extract JSON element 'queues'. - * - * NOTE: the JSON returned by Resource Manager REST API is not well formed - * and trying to parse the entire returned document results in parse exception - * using latest JSON parsers. - * - * @param json - * @return - */ - - public static String extractRootQueuesElement(String json) { - int start = json.indexOf("\"queues\":"); - int i = start; - while(json.charAt(i) != '{') { - i++; - } - i++; - int count = 1; - while (count != 0) { - if (json.charAt(i) == '{') { - count++; - } else if (json.charAt(i) == '}') { - count--; - } - i++; - } - return "{" + json.substring(start, i) + "}"; - } - - /** - * Retrieves scheduler info JSON from the resource manager and extracts hadoop cluster queue names into - * a set of strings. - * - * @param rmBaseUrlStr - * @return - * @throws FalconException - */ - - public static Set<String> getHadoopClusterQueueNames(String rmBaseUrlStr) throws FalconException { - String jsonResult = getHadoopClusterSchedulerInfo(rmBaseUrlStr); - LOG.debug("Scheduler Info Result : {} ", jsonResult); - Set<String> qNames = new HashSet<>(); - try { - return getHadoopClusterQueueNamesHelper(jsonResult, qNames); - } catch(JSONException jex) { - throw new FalconException(jex); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java b/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java deleted file mode 100644 index bacc092..0000000 --- a/common/src/main/java/org/apache/falcon/util/HdfsClassLoader.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Helper class loader that fetches jars from HDFS location and loads into JVM. - */ - -public class HdfsClassLoader extends URLClassLoader { - - private static final Logger LOG = LoggerFactory.getLogger(HdfsClassLoader.class); - private static Map<String, HdfsClassLoader> classLoaderCache = new ConcurrentHashMap<String, HdfsClassLoader>(); - private static final Object LOCK = new Object(); - - public static ClassLoader load(final String name, final List<String> jarHdfsPath) throws IOException { - LOG.info("ClassLoader cache size = " + classLoaderCache.size()); - if (classLoaderCache.containsKey(name)) { - return classLoaderCache.get(name); - } - - synchronized (LOCK) { - final URL[] urls = copyHdfsJarFilesToTempDir(name, jarHdfsPath); - LOG.info("Copied jar files from HDFS to local dir"); - final ClassLoader parentClassLoader = HdfsClassLoader.class.getClassLoader(); - HdfsClassLoader hdfsClassLoader = java.security.AccessController.doPrivileged( - new java.security.PrivilegedAction<HdfsClassLoader>() { - @Override - public HdfsClassLoader run() { - return new HdfsClassLoader(name, urls, parentClassLoader); - } - } - ); - LOG.info("Created a new HdfsClassLoader for name = {} with parent = {} using classpath = {}", - name, parentClassLoader.toString(), Arrays.toString(jarHdfsPath.toArray())); - classLoaderCache.put(name, hdfsClassLoader); - return hdfsClassLoader; - } - } - - private final ClassLoader realParent; - - public HdfsClassLoader(String name, URL[] urls, ClassLoader parentClassLoader) { - // set the 'parent' member to null giving an option for this class loader - super(urls, null); - this.realParent = parentClassLoader; - } - - @Override - protected Class<?> loadClass(String name, boolean resolve) - throws ClassNotFoundException { - - // Load through the parent class loader first and then fallback to this class loader. - try { - return realParent.loadClass(name); - } catch (Throwable t) { - return super.loadClass(name, resolve); - } - } - - @Override - public URL getResource(String name) { - // This is the same as the jdk's getResource except the parent - // is taken from the realParent member instead of the parent member. - URL url = realParent.getResource(name); - if (url == null) { - url = findResource(name); - } - return url; - } - - private static URL[] copyHdfsJarFilesToTempDir(String databaseName, List<String> jars) throws IOException { - List<URL> urls = new ArrayList<URL>(); - - final Configuration conf = new Configuration(); - Path localPath = createTempDir(databaseName, conf); - - for (String jar : jars) { - Path jarPath = new Path(jar); - final FileSystem fs = jarPath.getFileSystem(conf); - if (fs.isFile(jarPath) && jarPath.getName().endsWith(".jar")) { - LOG.info("Copying jarFile = " + jarPath); - fs.copyToLocalFile(jarPath, localPath); - } - } - urls.addAll(getJarsInPath(localPath.toUri().toURL())); - - return urls.toArray(new URL[urls.size()]); - } - - private static Path createTempDir(String databaseName, Configuration conf) throws IOException { - String tmpBaseDir = String.format("file://%s", System.getProperty("java.io.tmpdir")); - if (StringUtils.isBlank(tmpBaseDir)) { - tmpBaseDir = "file:///tmp"; - } - Path localPath = new Path(tmpBaseDir, databaseName); - localPath.getFileSystem(conf).mkdirs(localPath); - return localPath; - } - - private static List<URL> getJarsInPath(URL fileURL) throws MalformedURLException { - List<URL> urls = new ArrayList<URL>(); - - File file = new File(fileURL.getPath()); - if (file.isDirectory()) { - File[] jarFiles = file.listFiles(new FileFilter() { - @Override - public boolean accept(File file) { - return file.isFile() && file.getName().endsWith(".jar"); - } - }); - - if (jarFiles != null) { - for (File jarFile : jarFiles) { - urls.add(jarFile.toURI().toURL()); - } - } - - if (!fileURL.toString().endsWith("/")) { - fileURL = new URL(fileURL.toString() + "/"); - } - } - - urls.add(fileURL); - return urls; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/RadixNode.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RadixNode.java b/common/src/main/java/org/apache/falcon/util/RadixNode.java deleted file mode 100644 index 35d7ef3..0000000 --- a/common/src/main/java/org/apache/falcon/util/RadixNode.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -/** - * Represents a node in Radix Tree. - * - * Each node contains a part of the key, links to it's children and a collection of values - * stored against the key(if the node is the suffix of a key) - * - */ -public class RadixNode<T> { - - private String key; - - private List<RadixNode<T>> children; - - private boolean isTerminal; - - private Set<T> values; - - public RadixNode(){ - key = ""; - children = new LinkedList<RadixNode<T>>(); - isTerminal = false; - values = new HashSet<T>(); - } - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public List<RadixNode<T>> getChildren() { - return children; - } - - public void setChildren(List<RadixNode<T>> children) { - this.children = children; - } - - public boolean isTerminal() { - return isTerminal; - } - - public void setTerminal(boolean isTerminalNew) { - this.isTerminal = isTerminalNew; - } - - /** - * Root node is the node with a token string(empty String in our case) - * as key. - * - * @return True if the node is root Node, False otherwise - */ - public boolean isRoot(){ - return StringUtils.equals(key, ""); - } - - public Collection<T> getValues() { - return Collections.unmodifiableCollection(values); - } - - public void setValues(Collection<T> newValues) { - values = new HashSet<T>(); - values.addAll(newValues); - } - - public void addValue(T value){ - values.add(value); - } - - public void removeValue(T value) { - values.remove(value); - } - public void removeAll() { - values.clear(); - } - - public boolean containsValue(T value){ - return values.contains(value); - } - - public int getMatchLength(String input){ - int matchLength = 0; - - if (input == null){ - return 0; - } - - while(matchLength < key.length() - && matchLength < input.length() - && input.charAt(matchLength) == key.charAt(matchLength)){ - matchLength += 1; - } - - return matchLength; - } - - - /** - * Finds the length of the match between node's key and input. - * - * It can do either a character by character match or a regular expression match(used to match a feed instance path - * with feed location template). Only regular expressions allowed in the feed path are evaluated for matching. - * @param input input string to be matched with the key of the node. - * @param matcher A custom matcher algorithm to match node's key against the input. It is used when matching - * path of a Feed's instance to Feed's path template. - * @return - */ - public boolean matches(String input, FalconRadixUtils.INodeAlgorithm matcher) { - if (input == null) { - return false; - } - - if (matcher == null) { - return StringUtils.equals(getKey(), input); - } - - return matcher.match(this.getKey(), input); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/RadixTree.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RadixTree.java b/common/src/main/java/org/apache/falcon/util/RadixTree.java deleted file mode 100644 index a667506..0000000 --- a/common/src/main/java/org/apache/falcon/util/RadixTree.java +++ /dev/null @@ -1,432 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.entity.store.FeedPathStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.Collection; -import java.util.Collections; -import java.util.Formattable; -import java.util.Formatter; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - - -/** - * A thread-safe Radix Tree implementation of the LocationStore. - * - * - * A radix tree (also patricia trie or radix trie or compact prefix tree) is a space-optimized - * trie data structure where each node with only one child is merged with its parent. - * - * For example the tree representation for the following (key,value) pairs - - * [("key1", "value1"), ("key123", "Key was key123"), ("key124", "Key was key124"), - * ("key2", "value2"), ("random", "random")] will be as below. - * - * | - * |-key - * |--1[[value1]]* - * |---2 - * |----3[[Key was key123]]* - * |----4[[Key was key124]]* - * |--2[[value2]]* - * |-random[[random]]* - * - * For more details on Radix Tree please refer - * <a href="http://en.wikipedia.org/wiki/Radix_tree">Radix Tree</a> - * @param <T> Type of value being stored against the key. - */ -public class RadixTree<T> implements FeedPathStore<T>, Formattable { - private static final Logger LOG = LoggerFactory.getLogger(RadixTree.class); - - protected RadixNode<T> root; - - private int size; - - public RadixTree(){ - root = new RadixNode<T>(); - root.setKey(""); - size = 0; - } - - /** - * Return the number of keys stored in the tree. - * - * Since all keys end in terminal nodes and duplicate keys are not allowed, - * size is equal to the number of terminal nodes in the tree. - * @return number of keys in the tree. - */ - @Override - public synchronized int getSize() { - return size; - } - - /** - * Insert a <key, value> pair in the Radix Tree. - * - * @param key Key to be stored - * @param value Value to be stored against that key - */ - @Override - public synchronized void insert(@Nullable String key, @Nonnull T value){ - if (key != null && !key.trim().isEmpty()){ - LOG.debug("Insert called for key: {} and value: {}", key.trim(), value); - insertKeyRecursive(key.trim(), value, root); - } - } - - private void insertKeyRecursive(String remainingText, T value, RadixNode<T> currentNode){ - - int currentMatchLength = currentNode.getMatchLength(remainingText); - String newRemainingText = remainingText.substring(currentMatchLength, remainingText.length()); - - // if root or current node key is subset of the input key GO DOWN - if (currentNode.isRoot() - || (currentMatchLength == currentNode.getKey().length() - && currentMatchLength < remainingText.length())){ - - // if a path to go down exists then go down that path - boolean foundPath = false; - for(RadixNode<T> child: currentNode.getChildren()){ - if (child.getKey().charAt(0) == newRemainingText.charAt(0)){ - insertKeyRecursive(newRemainingText, value, child); - foundPath = true; - break; - } - } - // else create a new node. - if (!foundPath){ - RadixNode<T> node = new RadixNode<T>(); - node.setKey(newRemainingText); - node.addValue(value); - node.setTerminal(true); - currentNode.getChildren().add(node); - size += 1; - } - }else if (currentMatchLength == remainingText.length() && currentMatchLength < currentNode.getKey().length()){ - // if remainingText is subset of the current node key - RadixNode<T> node = new RadixNode<T>(); - node.setChildren(currentNode.getChildren()); - node.setKey(currentNode.getKey().substring(currentMatchLength)); - node.setValues(currentNode.getValues()); - node.setTerminal(currentNode.isTerminal()); - - currentNode.setChildren(new LinkedList<RadixNode<T>>()); - currentNode.getChildren().add(node); - currentNode.setTerminal(true); - currentNode.setKey(currentNode.getKey().substring(0, currentMatchLength)); - currentNode.removeAll(); - currentNode.addValue(value); - - size += 1; - - }else if (currentMatchLength < remainingText.length() && currentMatchLength < currentNode.getKey().length()){ - - //add new Node and move all current node's children and value to it - RadixNode<T> node = new RadixNode<T>(); - node.setChildren(currentNode.getChildren()); - node.setTerminal(currentNode.isTerminal()); - node.setValues(currentNode.getValues()); - node.setKey(currentNode.getKey().substring(currentMatchLength, currentNode.getKey().length())); - - // add node for the text - RadixNode<T> node2 = new RadixNode<T>(); - node2.setKey(newRemainingText); - node2.setTerminal(true); - node2.addValue(value); - - //update current node to be new root - currentNode.setTerminal(false); - currentNode.setKey(currentNode.getKey().substring(0, currentMatchLength)); - currentNode.setChildren(new LinkedList<RadixNode<T>>()); - currentNode.getChildren().add(node); - currentNode.getChildren().add(node2); - - size += 1; - }else if (currentMatchLength == remainingText.length() && currentMatchLength == currentNode.getKey().length()){ - // if current node key and input key both match equally - if (currentNode.isTerminal()){ - currentNode.addValue(value); - }else { - currentNode.setTerminal(true); - currentNode.addValue(value); - } - size += 1; - } - } - - /** - * Find the value for the given key if it exists in the tree, null otherwise. - * - * A key is said to exist in the tree if we can generate exactly that string - * by going down from root to a terminal node. If a key exists we return the value - * stored at the terminal node. - * - * @param key - input key to be searched. - * @return T Value of the key if it exists, null otherwise - */ - @Override - @Nullable - public synchronized Collection<T> find(@Nonnull String key, FalconRadixUtils.INodeAlgorithm algorithm) { - if (key != null && !key.trim().isEmpty()) { - if (algorithm == null) { - algorithm = new FalconRadixUtils.StringAlgorithm(); - } - return recursiveFind(key.trim(), root, algorithm); - } - return null; - } - - @Nullable - @Override - public Collection<T> find(@Nonnull String key) { - if (key != null && !key.trim().isEmpty()) { - FalconRadixUtils.INodeAlgorithm algorithm = new FalconRadixUtils.StringAlgorithm(); - return recursiveFind(key.trim(), root, algorithm); - } - return null; - } - - private Collection<T> recursiveFind(String key, RadixNode<T> currentNode, - FalconRadixUtils.INodeAlgorithm algorithm){ - - if (!algorithm.startsWith(currentNode.getKey(), key)){ - LOG.debug("Current Node key: {} is not a prefix in the input key: {}", currentNode.getKey(), key); - return null; - } - - if (algorithm.match(currentNode.getKey(), key)){ - if (currentNode.isTerminal()){ - LOG.debug("Found the terminal node with key: {} for the given input.", currentNode.getKey()); - return currentNode.getValues(); - }else { - LOG.debug("currentNode is not terminal. Current node's key is {}", currentNode.getKey()); - return null; - } - } - - //find child to follow, using remaining Text - RadixNode<T> newRoot = algorithm.getNextCandidate(currentNode, key); - String remainingText = algorithm.getRemainingText(currentNode, key); - - if (newRoot == null){ - LOG.debug("No child found to follow for further processing. Current node key {}"); - return null; - }else { - LOG.debug("Recursing with new key: {} and new remainingText: {}", newRoot.getKey(), remainingText); - return recursiveFind(remainingText, newRoot, algorithm); - } - } - - /** - * Deletes a given key,value pair from the Radix Tree. - * - * @param key key to be deleted - * @param value value to be deleted - */ - @Override - public synchronized boolean delete(@Nonnull String key, @Nonnull T value) { - if (key != null && !key.trim().isEmpty()){ - LOG.debug("Delete called for key:{}", key.trim()); - return recursiveDelete(key, null, root, value); - } - return false; - } - - private boolean recursiveDelete(String key, RadixNode<T> parent, RadixNode<T> currentNode, T value){ - LOG.debug("Recursing with key: {}, currentNode: {}", key, currentNode.getKey()); - if (!key.startsWith(currentNode.getKey())){ - LOG.debug("Current node's key: {} is not a prefix of the remaining input key: {}", - currentNode.getKey(), key); - return false; - } - - if (StringUtils.equals(key, currentNode.getKey())){ - LOG.trace("Current node's key:{} and the input key:{} matched", currentNode.getKey(), key); - if (currentNode.getValues().contains(value)){ - LOG.debug("Given value is found in the collection of values against the given key"); - currentNode.removeValue(value); - size -= 1; - if (currentNode.getValues().size() == 0){ - LOG.debug("Exact match between current node's key: {} and remaining input key: {}", - currentNode.getKey(), key); - if (currentNode.isTerminal()){ - //if child has no children & only one value, then delete and compact parent if needed - if (currentNode.getChildren().size() == 0){ - Iterator<RadixNode<T>> it = parent.getChildren().iterator(); - while(it.hasNext()){ - if (StringUtils.equals(it.next().getKey(), currentNode.getKey())){ - it.remove(); - LOG.debug("Deleting the node"); - break; - } - } - }else if (currentNode.getChildren().size() > 1){ - // if child has more than one children just mark non terminal - currentNode.setTerminal(false); - }else if (currentNode.getChildren().size() == 1){ - // if child has only one child then compact node - LOG.debug("compacting node with child as node to be deleted has only 1 child"); - RadixNode<T> child = currentNode.getChildren().get(0); - currentNode.setChildren(child.getChildren()); - currentNode.setTerminal(child.isTerminal()); - currentNode.setKey(currentNode.getKey() + child.getKey()); - currentNode.setValues(child.getValues()); - } - - //parent can't be null as root will never match with input key as it is not a terminal node. - if (!parent.isTerminal() && !parent.isRoot()){ - // if only one child left in parent and parent is not root then join parent - // and the only child key - if (parent.getChildren().size() == 1){ - RadixNode<T> onlyChild = parent.getChildren().get(0); - String onlyChildKey = onlyChild.getKey(); - LOG.debug("Compacting child: {} and parent: {}", onlyChildKey, parent.getKey()); - parent.setKey(parent.getKey() + onlyChildKey); - parent.setChildren(onlyChild.getChildren()); - parent.setTerminal(onlyChild.isTerminal()); - parent.setValues(onlyChild.getValues()); - } - } - return true; - }else{ - LOG.debug("Key found only as a prefix and not at a terminal node"); - return false; - } - } - return true; - }else { - LOG.debug("Current value is not found in the collection of values against the given key, no-op"); - return false; - } - } - - LOG.debug("Current node's key: {} is a prefix of the input key: {}", currentNode.getKey(), key); - //find child to follow - RadixNode<T> newRoot = null; - String remainingKey = key.substring(currentNode.getMatchLength(key)); - for(RadixNode<T> el : currentNode.getChildren()){ - LOG.trace("Finding next child to follow. Current child's key:{}", el.getKey()); - if (el.getKey().charAt(0) == remainingKey.charAt(0)){ - newRoot = el; - break; - } - } - - if (newRoot == null){ - LOG.debug("No child was found with common prefix with the remainder key: {}", key); - return false; - }else { - LOG.debug("Found a child's key: {} with common prefix, recursing on it", newRoot.getKey()); - return recursiveDelete(remainingKey, currentNode, newRoot, value); - } - } - - - /** - * Useful for debugging. - */ - @Override - public void formatTo(Formatter formatter, int flags, int width, int precision) { - formatNodeTo(formatter, 0, root); - - } - - private void formatNodeTo(Formatter formatter, int level, RadixNode<T> node){ - for (int i = 0; i < level; i++) { - formatter.format(" "); - } - formatter.format("|"); - for (int i = 0; i < level; i++) { - formatter.format("-"); - } - - if (node.isTerminal()){ - formatter.format("%s[%s]*%n", node.getKey(), node.getValues()); - }else{ - formatter.format("%s%n", node.getKey()); - } - - for (RadixNode<T> child : node.getChildren()) { - formatNodeTo(formatter, level + 1, child); - } - } - - /** - * Find List of substring of keys which have given input as a prefix. - * - * @param key - Input string for which all Suffix Children should be returned - * @param limit - Maximum Number of results. If limit is less than 0 then all nodes are returned. - * If limit is 0 then returns null. - */ - @javax.annotation.Nullable - public List<String> findSuffixChildren(String key, int limit){ - if (key == null || limit == 0){ - return null; - } - RadixNode<T> currentNode = root; - String remainingText = key.trim(); - List<String> result = new LinkedList<String>(); - do{ - boolean flag = false; - // find the child with common prefix - for(RadixNode<T> child: currentNode.getChildren()){ - LOG.debug("Checking for child key: {} against remainingText: {}", child.getKey(), remainingText); - if (child.getKey().charAt(0) == remainingText.charAt(0)){ - LOG.debug("Child key: {} found to have overlap with the remainingText: {}", child.getKey(), - remainingText); - flag = true; - - //if entire key doesn't match return null - if (!remainingText.startsWith(child.getKey())){ - return null; - } - - // if entire key equals remainingText - return it's children up to the specified limit - if (StringUtils.equals(child.getKey(), remainingText)){ - int counter = 0; - - for(RadixNode<T> suffixChild: child.getChildren()){ - if (limit < 0 || counter < limit){ - result.add(suffixChild.getKey()); - } - } - return Collections.unmodifiableList(result); - } - - //if entire key matches but it is not equal to entire remainingText - repeat - remainingText = remainingText.substring(child.getKey().length()); - currentNode = child; - break; - - } - } - // if no child found with common prefix return null; - if (!flag){ - return null; - } - }while (true); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java deleted file mode 100644 index 80022e0..0000000 --- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.falcon.FalconException; - -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; - -/** - * Helper methods for class instantiation through reflection. - */ -public final class ReflectionUtils { - - private ReflectionUtils() {} - - public static <T> T getInstance(String classKey) throws FalconException { - return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey)); - } - - public static <T> T getInstance(String classKey, Class<?> argCls, Object arg) throws FalconException { - return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey), argCls, arg); - } - - @SuppressWarnings("unchecked") - public static <T> T getInstanceByClassName(String clazzName) throws FalconException { - try { - Class<T> clazz = (Class<T>) ReflectionUtils.class.getClassLoader().loadClass(clazzName); - try { - return clazz.newInstance(); - } catch (IllegalAccessException e) { - Method method = clazz.getMethod("get"); - return (T) method.invoke(null); - } - } catch (Exception e) { - throw new FalconException("Unable to get instance for " + clazzName, e); - } - } - - /** - * Invokes constructor with one argument. - * @param clazzName - classname - * @param argCls - Class of the argument - * @param arg - constructor argument - * @param <T> - instance type - * @return Class instance - * @throws FalconException - */ - @SuppressWarnings("unchecked") - public static <T> T getInstanceByClassName(String clazzName, Class<?> argCls, Object arg) throws - FalconException { - try { - Class<T> clazz = (Class<T>) ReflectionUtils.class.getClassLoader().loadClass(clazzName); - Constructor<T> constructor = clazz.getConstructor(argCls); - return constructor.newInstance(arg); - } catch (Exception e) { - throw new FalconException("Unable to get instance for " + clazzName, e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java deleted file mode 100644 index a8b99bb..0000000 --- a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -/** - * enum for DistCp options. - */ -public enum ReplicationDistCpOption { - - DISTCP_OPTION_OVERWRITE("overwrite"), - DISTCP_OPTION_IGNORE_ERRORS("ignoreErrors"), - DISTCP_OPTION_SKIP_CHECKSUM("skipChecksum"), - DISTCP_OPTION_REMOVE_DELETED_FILES("removeDeletedFiles"), - DISTCP_OPTION_PRESERVE_BLOCK_SIZE("preserveBlockSize"), - DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER("preserveReplicationNumber"), - DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission"); - - private final String name; - - ReplicationDistCpOption(String name) { - this.name = name; - } - - public String getName() { - return name; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java deleted file mode 100644 index 714a64d..0000000 --- a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.falcon.FalconException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Dynamic properties that may be modified while the server is running. - */ -public final class RuntimeProperties extends ApplicationProperties { - - private static final Logger LOG = LoggerFactory.getLogger(RuntimeProperties.class); - - private static final String PROPERTY_FILE = "runtime.properties"; - - private static final AtomicReference<RuntimeProperties> INSTANCE = - new AtomicReference<RuntimeProperties>(); - - private RuntimeProperties() throws FalconException { - super(); - } - - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } - - public static Properties get() { - try { - if (INSTANCE.get() == null) { - RuntimeProperties properties = new RuntimeProperties(); - properties.loadProperties(); - properties.validateProperties(); - INSTANCE.compareAndSet(null, properties); - if (INSTANCE.get() == properties) { - Thread refreshThread = new Thread(new DynamicLoader()); - refreshThread.start(); - } - } - return INSTANCE.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + "runtime properties", e); - } - } - - protected void validateProperties() throws FalconException { - String colosProp = getProperty("all.colos"); - if (colosProp == null || colosProp.isEmpty()) { - return; - } - String[] colos = colosProp.split(","); - for (int i = 0; i < colos.length; i++) { - colos[i] = colos[i].trim(); - String falconEndpoint = getProperty("falcon." + colos[i] + ".endpoint"); - if (falconEndpoint == null || falconEndpoint.isEmpty()) { - throw new FalconException("No falcon server endpoint mentioned in Prism runtime for colo, " - + colos[i] + "."); - } - } - } - - /** - * Thread for loading properties periodically. - */ - private static final class DynamicLoader implements Runnable { - - private static final long REFRESH_DELAY = 300000L; - private static final int MAX_ITER = 20; //1hr - - @Override - public void run() { - long backOffDelay = REFRESH_DELAY; - while (true) { - try { - Thread.sleep(Math.min(MAX_ITER * REFRESH_DELAY, backOffDelay)); - try { - RuntimeProperties newProperties = new RuntimeProperties(); - newProperties.loadProperties(); - newProperties.validateProperties(); - INSTANCE.set(newProperties); - backOffDelay = REFRESH_DELAY; - } catch (FalconException e) { - LOG.warn("Error refreshing runtime properties", e); - backOffDelay += REFRESH_DELAY; - } - } catch (InterruptedException e) { - LOG.error("Application is stopping. Aborting..."); - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/StartupProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/StartupProperties.java b/common/src/main/java/org/apache/falcon/util/StartupProperties.java deleted file mode 100644 index 7522b0d..0000000 --- a/common/src/main/java/org/apache/falcon/util/StartupProperties.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.falcon.FalconException; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Properties read during application startup. - */ -public final class StartupProperties extends ApplicationProperties { - - private static final String PROPERTY_FILE = "startup.properties"; - - private static final AtomicReference<StartupProperties> INSTANCE = - new AtomicReference<StartupProperties>(); - - private StartupProperties() throws FalconException { - super(); - } - - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } - - public static Properties get() { - try { - if (INSTANCE.get() == null) { - INSTANCE.compareAndSet(null, new StartupProperties()); - } - return INSTANCE.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + "startup properties", e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java b/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java deleted file mode 100644 index a3e6a56..0000000 --- a/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.expression.ExpressionHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Properties for State Store during application startup. - */ -public final class StateStoreProperties extends ApplicationProperties { - - private static final Logger LOG = LoggerFactory.getLogger(StateStoreProperties.class); - - private static final String PROPERTY_FILE = "statestore.properties"; - private static final String CREDENTIALS_FILE= "falcon.statestore.credentials.file"; - private static final String DEFAULT_CREDENTIALS_FILE = "statestore.credentials"; - - private static final AtomicReference<StateStoreProperties> INSTANCE = - new AtomicReference<>(); - - - protected StateStoreProperties() throws FalconException { - super(); - } - - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } - - @Override - protected void loadProperties() throws FalconException { - super.loadProperties(); - - String credentialsFile = (String)get(CREDENTIALS_FILE); - try { - InputStream resourceAsStream = null; - if (StringUtils.isNotBlank(credentialsFile)) { - resourceAsStream = getResourceAsStream(new File(credentialsFile)); - } - // fall back to class path. - if (resourceAsStream == null) { - resourceAsStream = checkClassPath(DEFAULT_CREDENTIALS_FILE); - } - if (resourceAsStream != null) { - try { - loadCredentials(resourceAsStream); - return; - } finally { - IOUtils.closeQuietly(resourceAsStream); - } - } else { - throw new FalconException("Unable to find state store credentials file"); - } - } catch (IOException e) { - throw new FalconException("Error loading properties file: " + getPropertyFile(), e); - } - } - - private void loadCredentials(InputStream resourceAsStream) throws IOException { - Properties origProps = new Properties(); - origProps.load(resourceAsStream); - LOG.info("Initializing {} properties with domain {}", this.getClass().getName(), domain); - Set<String> keys = getKeys(origProps.keySet()); - for (String key : keys) { - String value = origProps.getProperty(domain + "." + key, origProps.getProperty("*." + key)); - if (value != null) { - value = ExpressionHelper.substitute(value); - LOG.debug("{}={}", key, value); - put(key, value); - } - } - } - - - public static Properties get() { - try { - if (INSTANCE.get() == null) { - INSTANCE.compareAndSet(null, new StateStoreProperties()); - } - return INSTANCE.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application state store properties", e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java deleted file mode 100644 index c713712..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Factory for providing appropriate workflow engine to the falcon service. - */ -@SuppressWarnings("unchecked") -public final class WorkflowEngineFactory { - - private static final Logger LOG = LoggerFactory.getLogger(WorkflowEngineFactory.class); - public static final String ENGINE_PROP="falcon.scheduler"; - private static AbstractWorkflowEngine nativeWorkflowEngine; - private static AbstractWorkflowEngine configuredWorkflowEngine; - private static final String CONFIGURED_WORKFLOW_ENGINE = "workflow.engine.impl"; - private static final String LIFECYCLE_ENGINE = "lifecycle.engine.impl"; - - private WorkflowEngineFactory() { - } - - /** - * @param entity - * @return The workflow engine using which the entity is scheduled. - * @throws FalconException - */ - public static AbstractWorkflowEngine getWorkflowEngine(Entity entity) throws FalconException { - // The below check is only for schedulable entities. - if (entity != null - && entity.getEntityType().isSchedulable() && getNativeWorkflowEngine().isActive(entity)) { - LOG.debug("Returning native workflow engine for entity {}", entity.getName()); - return nativeWorkflowEngine; - } - LOG.debug("Returning configured workflow engine for entity {}.", entity); - return getWorkflowEngine(); - } - - /** - * @param entity - * @param props - * @return Workflow engine as specified in the props and for a given schedulable entity. - * @throws FalconException - */ - public static AbstractWorkflowEngine getWorkflowEngine(Entity entity, Map<String, String> props) - throws FalconException { - // If entity is null or not schedulable and the engine property is not specified, return the configured WE. - if (entity == null || !entity.getEntityType().isSchedulable()) { - LOG.debug("Returning configured workflow engine for entity {}.", entity); - return getWorkflowEngine(); - } - - // Default to configured workflow engine when no properties are specified. - String engineName = getWorkflowEngine().getName(); - if (props != null && props.containsKey(ENGINE_PROP)) { - engineName = props.get(ENGINE_PROP); - } - - if (engineName.equalsIgnoreCase(getWorkflowEngine().getName())) { - // If already active on native - if (getNativeWorkflowEngine().isActive(entity)) { - throw new FalconException("Entity " + entity.getName() + " is already scheduled on native engine."); - } - LOG.debug("Returning configured workflow engine for entity {}", entity.getName()); - return configuredWorkflowEngine; - } else if (engineName.equalsIgnoreCase(getNativeWorkflowEngine().getName())) { - // If already active on configured workflow engine - if (getWorkflowEngine().isActive(entity)) { - throw new FalconException("Entity " + entity.getName() + " is already scheduled on " - + "configured workflow engine."); - } - LOG.debug("Returning native workflow engine for entity {}", entity.getName()); - return nativeWorkflowEngine; - } else { - throw new IllegalArgumentException("Property " + ENGINE_PROP + " is not set to a valid value."); - } - } - - /** - * @return An instance of the configurated workflow engine. - * @throws FalconException - */ - public static AbstractWorkflowEngine getWorkflowEngine() throws FalconException { - // Caching is only for optimization, workflow engine doesn't need to be a singleton. - if (configuredWorkflowEngine == null) { - configuredWorkflowEngine = ReflectionUtils.getInstance(CONFIGURED_WORKFLOW_ENGINE); - } - return configuredWorkflowEngine; - } - - public static AbstractWorkflowEngine getNativeWorkflowEngine() throws FalconException { - if (nativeWorkflowEngine == null) { - nativeWorkflowEngine = - ReflectionUtils.getInstanceByClassName("org.apache.falcon.workflow.engine.FalconWorkflowEngine"); - } - return nativeWorkflowEngine; - } - - public static AbstractPolicyBuilderFactory getLifecycleEngine() throws FalconException { - return ReflectionUtils.getInstance(LIFECYCLE_ENGINE); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java deleted file mode 100644 index 2171092..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; - -/** - * Arguments for workflow execution. - */ -public enum WorkflowExecutionArgs { - - // instance details - NOMINAL_TIME("nominalTime", "instance time"), - ENTITY_TYPE("entityType", "type of the entity"), - ENTITY_NAME("entityName", "name of the entity"), - TIMESTAMP("timeStamp", "current timestamp"), - - // where - CLUSTER_NAME("cluster", "name of the current cluster"), - OPERATION("operation", "operation like generate, delete, replicate"), - // Exactly same as the above. Introduced to ensure compatibility between messages produced by POST-PROCESSING and - // the values in conf. - DATA_OPERATION("falconDataOperation", "operation like generate, delete, replicate", false), - DATASOURCE_NAME("datasource", "name of the datasource", false), - - // who - WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"), - - // what - // workflow details - USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type", false), - USER_WORKFLOW_NAME("userWorkflowName", "user workflow name", false), - USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version", false), - - // workflow execution details - WORKFLOW_ID("workflowId", "current workflow-id of the instance"), - RUN_ID("runId", "current run-id of the instance"), - STATUS("status", "status of the user workflow isnstance"), - WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false), - USER_SUBFLOW_ID("subflowId", "external id of user workflow", false), - PARENT_ID("parentId", "The parent of the current workflow, typically coord action", false), - - WF_START_TIME("workflowStartTime", "workflow start time", false), - WF_END_TIME("workflowEndTime", "workflow end time", false), - WF_DURATION("workflowDuration", "workflow duration", false), - - // what inputs - INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs", false), - INPUT_FEED_PATHS("falconInPaths", "comma separated input feed instance paths", false), - INPUT_NAMES("falconInputNames", "name of the inputs", false), - INPUT_STORAGE_TYPES("falconInputFeedStorageTypes", "input storage types", false), - - // what outputs - OUTPUT_FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"), - OUTPUT_FEED_PATHS("feedInstancePaths", "comma separated feed instance paths"), - - // broker related parameters - TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false), - BRKR_IMPL_CLASS("brokerImplClass", "falcon message broker Implementation class"), - BRKR_URL("brokerUrl", "falcon message broker url"), - USER_BRKR_IMPL_CLASS("userBrokerImplClass", "user broker Impl class", false), - USER_BRKR_URL("userBrokerUrl", "user broker url", false), - BRKR_TTL("brokerTTL", "time to live for broker message in sec", false), - USER_JMS_NOTIFICATION_ENABLED("userJMSNotificationEnabled", "Is User notification via JMS enabled?", false), - SYSTEM_JMS_NOTIFICATION_ENABLED("systemJMSNotificationEnabled", "Is system notification via JMS enabled?", false), - - // state maintained - LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false), - // execution context data recorded - LOG_DIR("logDir", "log dir where lineage can be recorded"), - - CONTEXT_FILE("contextFile", "wf execution context file path where wf properties are recorded", false), - CONTEXT_TYPE("contextType", "wf execution context type, pre or post processing", false), - COUNTERS("counters", "store job counters", false); - - private final String name; - private final String description; - private final boolean isRequired; - - WorkflowExecutionArgs(String name, String description) { - this(name, description, true); - } - - WorkflowExecutionArgs(String name, String description, boolean isRequired) { - this.name = name; - this.description = description; - this.isRequired = isRequired; - } - - public Option getOption() { - return new Option(this.name, true, this.description); - } - - public String getName() { - return this.name; - } - - public String getDescription() { - return description; - } - - public boolean isRequired() { - return isRequired; - } - - public String getOptionValue(CommandLine cmd) { - return cmd.getOptionValue(this.name); - } - - @Override - public String toString() { - return getName(); - } -}
