http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java 
b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
deleted file mode 100644
index 41c9369..0000000
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.cluster.Location;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Helper to get end points relating to the cluster.
- */
-public final class ClusterHelper {
-    public static final String DEFAULT_BROKER_IMPL_CLASS = 
"org.apache.activemq.ActiveMQConnectionFactory";
-    public static final String WORKINGDIR = "working";
-    public static final String NO_USER_BROKER_URL = "NA";
-
-
-
-    private ClusterHelper() {
-    }
-
-    public static Cluster getCluster(String cluster) throws FalconException {
-        return ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-    }
-
-    public static Configuration getConfiguration(Cluster cluster) {
-        Configuration conf = new Configuration();
-
-        final String storageUrl = getStorageUrl(cluster);
-        conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
-
-        final String executeEndPoint = getMREndPoint(cluster);
-        conf.set(HadoopClientFactory.MR_JT_ADDRESS_KEY, executeEndPoint);
-        conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint);
-
-        if (cluster.getProperties() != null) {
-            for (Property prop : cluster.getProperties().getProperties()) {
-                conf.set(prop.getName(), prop.getValue());
-            }
-        }
-
-        return conf;
-    }
-
-    public static String getOozieUrl(Cluster cluster) {
-        return getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
-    }
-
-    public static String getStorageUrl(Cluster cluster) {
-        return getNormalizedUrl(cluster, Interfacetype.WRITE);
-    }
-
-    public static String getReadOnlyStorageUrl(Cluster cluster) {
-        return getNormalizedUrl(cluster, Interfacetype.READONLY);
-    }
-
-    public static String getMREndPoint(Cluster cluster) {
-        return getInterface(cluster, Interfacetype.EXECUTE).getEndpoint();
-    }
-
-    public static String getRegistryEndPoint(Cluster cluster) {
-        final Interface catalogInterface = getInterface(cluster, 
Interfacetype.REGISTRY);
-        return catalogInterface == null ? null : 
catalogInterface.getEndpoint();
-    }
-
-    public static String getMessageBrokerUrl(Cluster cluster) {
-        final Interface messageInterface = getInterface(cluster, 
Interfacetype.MESSAGING);
-        return messageInterface == null ? NO_USER_BROKER_URL : 
messageInterface.getEndpoint();
-    }
-
-    public static String getMessageBrokerImplClass(Cluster cluster) {
-        if (cluster.getProperties() != null) {
-            for (Property prop : cluster.getProperties().getProperties()) {
-                if (prop.getName().equals("brokerImplClass")) {
-                    return prop.getValue();
-                }
-            }
-        }
-        return DEFAULT_BROKER_IMPL_CLASS;
-    }
-
-    public static Interface getInterface(Cluster cluster, Interfacetype type) {
-        for (Interface interf : cluster.getInterfaces().getInterfaces()) {
-            if (interf.getType() == type) {
-                return interf;
-            }
-        }
-        return null;
-    }
-
-    private static String getNormalizedUrl(Cluster cluster, Interfacetype 
type) {
-        String normalizedUrl = getInterface(cluster, type).getEndpoint();
-        if (normalizedUrl.endsWith("///")){
-            return normalizedUrl;
-        }
-        String normalizedPath = new Path(normalizedUrl + "/").toString();
-        return normalizedPath.substring(0, normalizedPath.length() - 1);
-    }
-
-
-
-    public static Location getLocation(Cluster cluster, ClusterLocationType 
clusterLocationType) {
-        for (Location loc : cluster.getLocations().getLocations()) {
-            if (loc.getName().equals(clusterLocationType)) {
-                return loc;
-            }
-        }
-        //Mocking the working location FALCON-910
-        if (clusterLocationType.equals(ClusterLocationType.WORKING)) {
-            Location staging = getLocation(cluster, 
ClusterLocationType.STAGING);
-            if (staging != null) {
-                Location working = new Location();
-                working.setName(ClusterLocationType.WORKING);
-                
working.setPath(staging.getPath().charAt(staging.getPath().length() - 1) == '/'
-                        ?
-                        staging.getPath().concat(WORKINGDIR)
-                        :
-                        staging.getPath().concat("/").concat(WORKINGDIR));
-                return working;
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Parsed the cluster object and checks for the working location.
-     *
-     * @param cluster
-     * @return
-     */
-    public static boolean checkWorkingLocationExists(Cluster cluster) {
-        for (Location loc : cluster.getLocations().getLocations()) {
-            if (loc.getName().equals(ClusterLocationType.WORKING)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static String getPropertyValue(Cluster cluster, String propName) {
-        if (cluster.getProperties() != null) {
-            for (Property prop : cluster.getProperties().getProperties()) {
-                if (prop.getName().equals(propName)) {
-                    return prop.getValue();
-                }
-            }
-        }
-        return null;
-    }
-
-    public static Map<String, String> getHiveProperties(Cluster cluster) {
-        if (cluster.getProperties() != null) {
-            List<Property> properties = 
cluster.getProperties().getProperties();
-            if (properties != null && !properties.isEmpty()) {
-                Map<String, String> hiveProperties = new HashMap<String, 
String>();
-                for (Property prop : properties) {
-                    if (prop.getName().startsWith("hive.")) {
-                        hiveProperties.put(prop.getName(), prop.getValue());
-                    }
-                }
-                return hiveProperties;
-            }
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java 
b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
deleted file mode 100644
index e4ca91b..0000000
--- a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.service.ConfigurationChangeListener;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Map of clusters in each colocation/ datacenter.
- */
-public final class ColoClusterRelation implements ConfigurationChangeListener {
-    private static final ConcurrentHashMap<String, Set<String>> 
COLO_CLUSTER_MAP =
-        new ConcurrentHashMap<String, Set<String>>();
-    private static final ColoClusterRelation INSTANCE = new 
ColoClusterRelation();
-
-    private ColoClusterRelation() {
-    }
-
-    public static ColoClusterRelation get() {
-        return INSTANCE;
-    }
-
-    public Set<String> getClusters(String colo) {
-        if (COLO_CLUSTER_MAP.containsKey(colo)) {
-            return COLO_CLUSTER_MAP.get(colo);
-        }
-        return new HashSet<String>();
-    }
-
-    @Override
-    public void onAdd(Entity entity) {
-        if (entity.getEntityType() != EntityType.CLUSTER) {
-            return;
-        }
-
-        Cluster cluster = (Cluster) entity;
-        COLO_CLUSTER_MAP.putIfAbsent(cluster.getColo(), new HashSet<String>());
-        COLO_CLUSTER_MAP.get(cluster.getColo()).add(cluster.getName());
-    }
-
-    @Override
-    public void onRemove(Entity entity) {
-        if (entity.getEntityType() != EntityType.CLUSTER) {
-            return;
-        }
-
-        Cluster cluster = (Cluster) entity;
-        COLO_CLUSTER_MAP.get(cluster.getColo()).remove(cluster.getName());
-        if (COLO_CLUSTER_MAP.get(cluster.getColo()).isEmpty()) {
-            COLO_CLUSTER_MAP.remove(cluster.getColo());
-        }
-    }
-
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws 
FalconException {
-        if (oldEntity.getEntityType() != EntityType.CLUSTER) {
-            return;
-        }
-        throw new FalconException("change shouldn't be supported on cluster!");
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java 
b/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
deleted file mode 100644
index 51ce898..0000000
--- a/common/src/main/java/org/apache/falcon/entity/DatasourceHelper.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.datasource.Credential;
-import org.apache.falcon.entity.v0.datasource.Credentialtype;
-import org.apache.falcon.entity.v0.datasource.Datasource;
-import org.apache.falcon.entity.v0.datasource.DatasourceType;
-import org.apache.falcon.entity.v0.datasource.Interface;
-import org.apache.falcon.entity.v0.datasource.Interfaces;
-import org.apache.falcon.entity.v0.datasource.Interfacetype;
-import org.apache.falcon.entity.v0.datasource.PasswordAliasType;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.security.CredentialProviderHelper;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * DataSource entity helper methods.
- */
-
-public final class DatasourceHelper {
-
-    public static final String HADOOP_CREDENTIAL_PROVIDER_FILEPATH = 
"hadoop.security.credential.provider.path";
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DatasourceHelper.class);
-
-    private static final ConfigurationStore STORE = ConfigurationStore.get();
-
-    public static DatasourceType getDatasourceType(String datasourceName) 
throws FalconException {
-        return getDatasource(datasourceName).getType();
-    }
-
-    private DatasourceHelper() {}
-
-    public static Datasource getDatasource(String datasourceName) throws 
FalconException {
-        return STORE.get(EntityType.DATASOURCE, datasourceName);
-    }
-
-    public static String getReadOnlyEndpoint(Datasource datasource) {
-        return getInterface(datasource, Interfacetype.READONLY);
-    }
-
-    public static String getWriteEndpoint(Datasource datasource) {
-        return getInterface(datasource, Interfacetype.WRITE);
-    }
-
-    /**
-     * Returns user name and password pair as it is specified in the XML. If 
the credential type is
-     * password-file, the path name is returned.
-     *
-     * @param db
-     * @return Credential
-     * @throws FalconException
-     */
-
-    public static Credential getReadPasswordInfo(Datasource db) throws 
FalconException {
-        for (Interface ifs : db.getInterfaces().getInterfaces()) {
-            if ((ifs.getType() == Interfacetype.READONLY) && 
(ifs.getCredential() != null)) {
-                return ifs.getCredential();
-            }
-        }
-        return getDefaultPasswordInfo(db.getInterfaces());
-    }
-
-    public static Credential getWritePasswordInfo(Datasource db) throws 
FalconException {
-        for (Interface ifs : db.getInterfaces().getInterfaces()) {
-            if ((ifs.getType() == Interfacetype.WRITE) && (ifs.getCredential() 
!= null)) {
-                return ifs.getCredential();
-            }
-        }
-        return getDefaultPasswordInfo(db.getInterfaces());
-    }
-
-    /**
-     * Returns user name and actual password pair. If the credential type is 
password-file, then the
-     * password is read from the HDFS file. If the credential type is 
password-text, the clear text
-     * password is returned.
-     *
-     * @param db
-     * @return
-     * @throws FalconException
-     */
-    public static java.util.Properties fetchReadPasswordInfo(Datasource db) 
throws FalconException {
-        Credential cred = getReadPasswordInfo(db);
-        return fetchPasswordInfo(cred);
-    }
-
-    public static java.util.Properties fetchWritePasswordInfo(Datasource db) 
throws FalconException {
-        Credential cred = getWritePasswordInfo(db);
-        return fetchPasswordInfo(cred);
-    }
-
-    public static java.util.Properties fetchPasswordInfo(Credential cred) 
throws FalconException {
-        java.util.Properties p = new java.util.Properties();
-        p.put("user", cred.getUserName());
-        if (cred.getType() == Credentialtype.PASSWORD_TEXT) {
-            p.put("password", cred.getPasswordText());
-        } else if (cred.getType() == Credentialtype.PASSWORD_FILE) {
-            String actualPasswd = 
fetchPasswordInfoFromFile(cred.getPasswordFile());
-            p.put("password", actualPasswd);
-        } else if (cred.getType() == Credentialtype.PASSWORD_ALIAS) {
-            String actualPasswd = 
fetchPasswordInfoFromCredentialStore(cred.getPasswordAlias());
-            p.put("password", actualPasswd);
-        }
-        return p;
-    }
-
-    public static String buildJceksProviderPath(URI credURI) {
-        StringBuilder sb = new StringBuilder();
-        final String credProviderPath = sb.append("jceks:").append("//")
-                .append(credURI.getScheme()).append("@")
-                .append(credURI.getHost())
-                .append(credURI.getPath()).toString();
-        return credProviderPath;
-    }
-
-    /**
-     * Return the Interface endpoint for the interface type specified in the 
argument.
-     *
-     * @param db
-     * @param type - can be read-only or write
-     * @return
-     */
-    private static String getInterface(Datasource db, Interfacetype type) {
-        for(Interface ifs : db.getInterfaces().getInterfaces()) {
-            if (ifs.getType() == type) {
-                return ifs.getEndpoint();
-            }
-        }
-        return null;
-    }
-
-    private static Credential getDefaultPasswordInfo(Interfaces ifs) throws 
FalconException {
-
-        if (ifs.getCredential() != null) {
-            return ifs.getCredential();
-        } else {
-            throw new FalconException("Missing Interfaces default credential");
-        }
-    }
-
-    private static String fetchPasswordInfoFromCredentialStore(final 
PasswordAliasType c) throws FalconException {
-        try {
-            final String credPath = c.getProviderPath();
-            final URI credURI = new URI(credPath);
-            if (StringUtils.isBlank(credURI.getScheme())
-                || StringUtils.isBlank(credURI.getHost())
-                || StringUtils.isBlank(credURI.getPath())) {
-                throw new FalconException("Password alias jceks provider HDFS 
path is incorrect.");
-            }
-            final String alias = c.getAlias();
-            if (StringUtils.isBlank(alias)) {
-                throw new FalconException("Password alias is empty.");
-            }
-
-            final String credProviderPath = buildJceksProviderPath(credURI);
-            LOG.info("Credential provider HDFS path : " + credProviderPath);
-
-            if (CredentialProviderHelper.isProviderAvailable()) {
-                UserGroupInformation ugi = CurrentUser.getProxyUGI();
-                String password = ugi.doAs(new 
PrivilegedExceptionAction<String>() {
-                    public String run() throws Exception {
-                        final Configuration conf = new Configuration();
-                        conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, 
credPath);
-                        
conf.set(CredentialProviderHelper.CREDENTIAL_PROVIDER_PATH, credProviderPath);
-                        FileSystem fs = FileSystem.get(credURI, conf);
-                        if (!fs.exists(new Path(credPath))) {
-                            String msg = String.format("Credential provider 
hdfs path [%s] does not "
-                                   + "exist or access denied!", credPath);
-                            LOG.error(msg);
-                            throw new FalconException(msg);
-                        }
-                        return CredentialProviderHelper.resolveAlias(conf, 
alias);
-                    }
-                });
-                return password;
-            } else {
-                throw new FalconException("Credential Provider is not 
initialized");
-            }
-        } catch (Exception ioe) {
-            String msg = "Exception while trying to fetch credential alias";
-            LOG.error(msg, ioe);
-            throw new FalconException(msg, ioe);
-        }
-    }
-    private static String fetchPasswordInfoFromFile(String passwordFilePath) 
throws FalconException {
-        try {
-            Path path = new Path(passwordFilePath);
-            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
-            if (!fs.exists(path)) {
-                throw new IOException("The password file does not exist! "
-                        + passwordFilePath);
-            }
-
-            if (!fs.isFile(path)) {
-                throw new IOException("The password file cannot be a 
directory! "
-                        + passwordFilePath);
-            }
-
-            InputStream is = fs.open(path);
-            StringWriter writer = new StringWriter();
-            try {
-                IOUtils.copy(is, writer);
-                return writer.toString();
-            } finally {
-                IOUtils.closeQuietly(is);
-                IOUtils.closeQuietly(writer);
-                fs.close();
-            }
-        } catch (IOException ioe) {
-            LOG.error("Error reading password file from HDFS : " + ioe);
-            throw new FalconException(ioe);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java
 
b/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java
deleted file mode 100644
index 40f83e4..0000000
--- 
a/common/src/main/java/org/apache/falcon/entity/EntityNotRegisteredException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Exception thrown by falcon when entity is not registered already in config 
store.
- */
-public class EntityNotRegisteredException extends FalconException {
-
-    public EntityNotRegisteredException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java 
b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
deleted file mode 100644
index 96befa1..0000000
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ /dev/null
@@ -1,1085 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.beanutils.PropertyUtils;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityGraph;
-import org.apache.falcon.entity.v0.EntityNotification;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
-import org.apache.falcon.entity.v0.datasource.DatasourceType;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.LateProcess;
-import org.apache.falcon.entity.v0.process.PolicyType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Retry;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.resource.EntityList;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Helper to get entity object.
- */
-public final class EntityUtil {
-    public static final Logger LOG = LoggerFactory.getLogger(EntityUtil.class);
-
-    public static final String MR_QUEUE_NAME = "queueName";
-
-    private static final long MINUTE_IN_MS = 60 * 1000L;
-    private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
-    private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
-    private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
-    private static final long ONE_MS = 1;
-    public static final String MR_JOB_PRIORITY = "jobPriority";
-
-    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-    public static final String WF_LIB_SEPARATOR = ",";
-    private static final String STAGING_DIR_NAME_SEPARATOR = "_";
-
-    /** Priority with which the DAG will be scheduled.
-     *  Matches the five priorities of Hadoop jobs.
-     */
-    public enum JOBPRIORITY {
-        VERY_HIGH((short) 1),
-        HIGH((short) 2),
-        NORMAL((short) 3),
-        LOW((short) 4),
-        VERY_LOW((short) 5);
-
-        private short priority;
-
-        public short getPriority() {
-            return priority;
-        }
-
-        JOBPRIORITY(short priority) {
-            this.priority = priority;
-        }
-    }
-
-    private EntityUtil() {}
-
-    public static <T extends Entity> T getEntity(EntityType type, String 
entityName) throws FalconException {
-        ConfigurationStore configStore = ConfigurationStore.get();
-        T entity = configStore.get(type, entityName);
-        if (entity == null) {
-            throw new EntityNotRegisteredException(entityName + " (" + type + 
") not found");
-        }
-        return entity;
-    }
-
-    public static <T extends Entity> T getEntity(String type, String 
entityName) throws FalconException {
-        EntityType entityType;
-        try {
-            entityType = EntityType.getEnum(type);
-        } catch (IllegalArgumentException e) {
-            throw new FalconException("Invalid entity type: " + type, e);
-        }
-        return getEntity(entityType, entityName);
-    }
-
-    public static TimeZone getTimeZone(String tzId) {
-        if (tzId == null) {
-            throw new IllegalArgumentException("Invalid TimeZone: Cannot be 
null.");
-        }
-        TimeZone tz = TimeZone.getTimeZone(tzId);
-        if (!tzId.equals("GMT") && tz.getID().equals("GMT")) {
-            throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
-        }
-        return tz;
-    }
-
-    public static Date getEndTime(Entity entity, String cluster) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            return getEndTime((Process) entity, cluster);
-        } else {
-            return getEndTime((Feed) entity, cluster);
-        }
-    }
-
-    public static Date parseDateUTC(String dateStr) throws FalconException {
-        try {
-            return SchemaHelper.parseDateUTC(dateStr);
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    public static Date getStartTime(Entity entity, String cluster) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            return getStartTime((Process) entity, cluster);
-        } else {
-            return getStartTime((Feed) entity, cluster);
-        }
-    }
-
-    public static Date getEndTime(Process process, String cluster) {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = 
ProcessHelper.getCluster(process, cluster);
-        return processCluster.getValidity().getEnd();
-    }
-
-    public static Date getStartTime(Process process, String cluster) {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = 
ProcessHelper.getCluster(process, cluster);
-        return processCluster.getValidity().getStart();
-    }
-
-    public static Date getEndTime(Feed feed, String cluster) {
-        org.apache.falcon.entity.v0.feed.Cluster clusterDef = 
FeedHelper.getCluster(feed, cluster);
-        return clusterDef.getValidity().getEnd();
-    }
-
-    public static Date getStartTime(Feed feed, String cluster) {
-        org.apache.falcon.entity.v0.feed.Cluster clusterDef = 
FeedHelper.getCluster(feed, cluster);
-        return clusterDef.getValidity().getStart();
-    }
-
-    public static int getParallel(Entity entity) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            return getParallel((Process) entity);
-        } else {
-            return getParallel((Feed) entity);
-        }
-    }
-
-    public static void setStartDate(Entity entity, String cluster, Date 
startDate) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            setStartDate((Process) entity, cluster, startDate);
-        } else {
-            setStartDate((Feed) entity, cluster, startDate);
-        }
-    }
-
-    public static void setEndTime(Entity entity, String cluster, Date endDate) 
{
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            setEndTime((Process) entity, cluster, endDate);
-        } else {
-            setEndTime((Feed) entity, cluster, endDate);
-        }
-    }
-
-    public static void setParallel(Entity entity, int parallel) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            setParallel((Process) entity, parallel);
-        } else {
-            setParallel((Feed) entity, parallel);
-        }
-    }
-
-    public static int getParallel(Process process) {
-        return process.getParallel();
-    }
-
-    public static void setStartDate(Process process, String cluster, Date 
startDate) {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = 
ProcessHelper.getCluster(process, cluster);
-        processCluster.getValidity().setStart(startDate);
-    }
-
-    public static void setParallel(Process process, int parallel) {
-        process.setParallel(parallel);
-    }
-
-    public static void setEndTime(Process process, String cluster, Date 
endDate) {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = 
ProcessHelper.getCluster(process, cluster);
-        processCluster.getValidity().setEnd(endDate);
-    }
-
-    public static int getParallel(Feed feed) {
-        // todo - how this this supposed to work?
-        return 1;
-    }
-
-    public static void setStartDate(Feed feed, String cluster, Date startDate) 
{
-        org.apache.falcon.entity.v0.feed.Cluster clusterDef = 
FeedHelper.getCluster(feed, cluster);
-        clusterDef.getValidity().setStart(startDate);
-    }
-
-    public static void setEndTime(Feed feed, String cluster, Date endDate) {
-        org.apache.falcon.entity.v0.feed.Cluster clusterDef = 
FeedHelper.getCluster(feed, cluster);
-        clusterDef.getValidity().setStart(endDate);
-    }
-
-    public static void setParallel(Feed feed, int parallel) {
-    }
-
-    public static Frequency getFrequency(Entity entity) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            return getFrequency((Process) entity);
-        } else {
-            return getFrequency((Feed) entity);
-        }
-    }
-
-    public static Frequency getFrequency(Process process) {
-        return process.getFrequency();
-    }
-
-    public static Frequency getFrequency(Feed feed) {
-        return feed.getFrequency();
-    }
-
-    public static TimeZone getTimeZone(Entity entity) {
-        if (entity.getEntityType() == EntityType.PROCESS) {
-            return getTimeZone((Process) entity);
-        } else {
-            return getTimeZone((Feed) entity);
-        }
-    }
-
-    public static TimeZone getTimeZone(Process process) {
-        return process.getTimezone();
-    }
-
-    public static TimeZone getTimeZone(Feed feed) {
-        return feed.getTimezone();
-    }
-
-    /**
-     * Returns true if the given instanceTime is a valid instanceTime on the 
basis of startTime and frequency of an
-     * entity.
-     *
-     * It doesn't check the instanceTime being after the validity of entity.
-     * @param startTime startTime of the entity
-     * @param frequency frequency of the entity.
-     * @param timezone timezone of the entity.
-     * @param instanceTime instanceTime to be checked for validity
-     * @return
-     */
-    public static boolean isValidInstanceTime(Date startTime, Frequency 
frequency, TimeZone timezone,
-        Date instanceTime) {
-        Date next = getNextStartTime(startTime, frequency, timezone, 
instanceTime);
-        return next.equals(instanceTime);
-    }
-
-    public static Date getNextStartTime(Date startTime, Frequency frequency, 
TimeZone timezone, Date referenceTime) {
-        if (startTime.after(referenceTime)) {
-            return startTime;
-        }
-
-        Calendar startCal = Calendar.getInstance(timezone);
-        startCal.setTime(startTime);
-
-        int count = 0;
-        switch (frequency.getTimeUnit()) {
-        case months:
-            count = (int) ((referenceTime.getTime() - startTime.getTime()) / 
MONTH_IN_MS);
-            break;
-        case days:
-            count = (int) ((referenceTime.getTime() - startTime.getTime()) / 
DAY_IN_MS);
-            break;
-        case hours:
-            count = (int) ((referenceTime.getTime() - startTime.getTime()) / 
HOUR_IN_MS);
-            break;
-        case minutes:
-            count = (int) ((referenceTime.getTime() - startTime.getTime()) / 
MINUTE_IN_MS);
-            break;
-        default:
-        }
-
-        final int freq = frequency.getFrequencyAsInt();
-        if (count > 2) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 
2) / freq) * freq);
-        }
-        while (startCal.getTime().before(referenceTime)) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
-        }
-        return startCal.getTime();
-    }
-
-
-    public static Properties getEntityProperties(Entity myEntity) {
-        Properties properties = new Properties();
-        switch (myEntity.getEntityType()) {
-        case CLUSTER:
-            org.apache.falcon.entity.v0.cluster.Properties clusterProps = 
((Cluster) myEntity).getProperties();
-            if (clusterProps != null) {
-                for (Property prop : clusterProps.getProperties()) {
-                    properties.put(prop.getName(), prop.getValue());
-                }
-            }
-            break;
-
-        case FEED:
-            org.apache.falcon.entity.v0.feed.Properties feedProps = ((Feed) 
myEntity).getProperties();
-            if (feedProps != null) {
-                for (org.apache.falcon.entity.v0.feed.Property prop : 
feedProps.getProperties()) {
-                    properties.put(prop.getName(), prop.getValue());
-                }
-            }
-            break;
-
-        case PROCESS:
-            org.apache.falcon.entity.v0.process.Properties processProps = 
((Process) myEntity).getProperties();
-            if (processProps != null) {
-                for (org.apache.falcon.entity.v0.process.Property prop : 
processProps.getProperties()) {
-                    properties.put(prop.getName(), prop.getValue());
-                }
-            }
-            break;
-
-        default:
-            throw new IllegalArgumentException("Unhandled entity type " + 
myEntity.getEntityType());
-        }
-        return properties;
-    }
-
-
-    public static int getInstanceSequence(Date startTime, Frequency frequency, 
TimeZone tz, Date instanceTime) {
-        if (startTime.after(instanceTime)) {
-            return -1;
-        }
-
-        if (tz == null) {
-            tz = TimeZone.getTimeZone("UTC");
-        }
-
-        Calendar startCal = Calendar.getInstance(tz);
-        startCal.setTime(startTime);
-
-        int count = 0;
-        switch (frequency.getTimeUnit()) {
-        case months:
-            count = (int) ((instanceTime.getTime() - startTime.getTime()) / 
MONTH_IN_MS);
-            break;
-        case days:
-            count = (int) ((instanceTime.getTime() - startTime.getTime()) / 
DAY_IN_MS);
-            break;
-        case hours:
-            count = (int) ((instanceTime.getTime() - startTime.getTime()) / 
HOUR_IN_MS);
-            break;
-        case minutes:
-            count = (int) ((instanceTime.getTime() - startTime.getTime()) / 
MINUTE_IN_MS);
-            break;
-        default:
-        }
-
-        final int freq = frequency.getFrequencyAsInt();
-        if (count > 2) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(), (count / 
freq) * freq);
-            count = (count / freq);
-        } else {
-            count = 0;
-        }
-        while (startCal.getTime().before(instanceTime)) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
-            count++;
-        }
-        return count + 1;
-    }
-
-    public static Date getNextInstanceTime(Date instanceTime, Frequency 
frequency, TimeZone tz, int instanceCount) {
-        if (tz == null) {
-            tz = TimeZone.getTimeZone("UTC");
-        }
-        Calendar insCal = Calendar.getInstance(tz);
-        insCal.setTime(instanceTime);
-
-        final int freq = frequency.getFrequencyAsInt() * instanceCount;
-        insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
-
-        return insCal.getTime();
-    }
-
-    public static String md5(Entity entity) throws FalconException {
-        return new String(Hex.encodeHex(DigestUtils.md5(stringOf(entity))));
-    }
-
-    public static boolean equals(Entity lhs, Entity rhs) throws 
FalconException {
-        return equals(lhs, rhs, null);
-    }
-
-    public static boolean equals(Entity lhs, Entity rhs, String[] filterProps) 
throws FalconException {
-        if (lhs == null && rhs == null) {
-            return true;
-        }
-        if (lhs == null || rhs == null) {
-            return false;
-        }
-
-        if (lhs.equals(rhs)) {
-            String lhsString = stringOf(lhs, filterProps);
-            String rhsString = stringOf(rhs, filterProps);
-            return lhsString.equals(rhsString);
-        } else {
-            return false;
-        }
-    }
-
-    public static String stringOf(Entity entity) throws FalconException {
-        return stringOf(entity, null);
-    }
-
-    private static String stringOf(Entity entity, String[] filterProps) throws 
FalconException {
-        Map<String, String> map = new HashMap<String, String>();
-        mapToProperties(entity, null, map, filterProps);
-        List<String> keyList = new ArrayList<String>(map.keySet());
-        Collections.sort(keyList);
-        StringBuilder builer = new StringBuilder();
-        for (String key : keyList) {
-            builer.append(key).append('=').append(map.get(key)).append('\n');
-        }
-        return builer.toString();
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static void mapToProperties(Object obj, String name, Map<String, 
String> propMap, String[] filterProps)
-        throws FalconException {
-
-        if (obj == null) {
-            return;
-        }
-
-        if (filterProps != null && name != null) {
-            for (String filter : filterProps) {
-                if (name.matches(filter.replace(".", "\\.").replace("[", 
"\\[").replace("]", "\\]"))) {
-                    return;
-                }
-            }
-        }
-
-        if (Date.class.isAssignableFrom(obj.getClass())) {
-            propMap.put(name, SchemaHelper.formatDateUTC((Date) obj));
-        } else if (obj.getClass().getPackage().getName().equals("java.lang")) {
-            propMap.put(name, String.valueOf(obj));
-        } else if (TimeZone.class.isAssignableFrom(obj.getClass())) {
-            propMap.put(name, ((TimeZone) obj).getID());
-        } else if (Enum.class.isAssignableFrom(obj.getClass())) {
-            propMap.put(name, ((Enum) obj).name());
-        } else if (List.class.isAssignableFrom(obj.getClass())) {
-            List list = (List) obj;
-            for (int index = 0; index < list.size(); index++) {
-                mapToProperties(list.get(index), name + "[" + index + "]", 
propMap, filterProps);
-            }
-        } else {
-            try {
-                Method method = obj.getClass().getDeclaredMethod("toString");
-                propMap.put(name, (String) method.invoke(obj));
-            } catch (NoSuchMethodException e) {
-                try {
-                    Map map = PropertyUtils.describe(obj);
-                    for (Object entry : map.entrySet()) {
-                        String key = (String)((Map.Entry)entry).getKey();
-                        if (!key.equals("class")) {
-                            mapToProperties(map.get(key), name != null ? name 
+ "." + key : key, propMap,
-                                    filterProps);
-                        } else {
-                            // Just add the parent element to the list too.
-                            // Required to detect addition/removal of optional 
elements with child nodes.
-                            // For example, late-process
-                            propMap.put(((Class)map.get(key)).getSimpleName(), 
"");
-                        }
-                    }
-                } catch (Exception e1) {
-                    throw new FalconException(e1);
-                }
-            } catch (Exception e) {
-                throw new FalconException(e);
-            }
-        }
-    }
-
-    public static WorkflowName getWorkflowName(Tag tag, List<String> suffixes,
-                                               Entity entity) {
-        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
-                entity);
-        builder.setTag(tag);
-        builder.setSuffixes(suffixes);
-        return builder.getWorkflowName();
-    }
-
-    public static WorkflowName getWorkflowName(Tag tag, Entity entity) {
-        return getWorkflowName(tag, null, entity);
-    }
-
-    public static WorkflowName getWorkflowName(Entity entity) {
-        return getWorkflowName(null, null, entity);
-    }
-
-    public static String getWorkflowNameSuffix(String workflowName,
-                                               Entity entity) throws 
FalconException {
-        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
-                entity);
-        return builder.getWorkflowSuffixes(workflowName).replaceAll("_", "");
-    }
-
-    public static Tag getWorkflowNameTag(String workflowName, Entity entity) {
-        WorkflowNameBuilder<Entity> builder = new WorkflowNameBuilder<Entity>(
-                entity);
-        return builder.getWorkflowTag(workflowName);
-    }
-
-    public static List<String> getWorkflowNames(Entity entity) {
-        switch(entity.getEntityType()) {
-        case FEED:
-            return Arrays.asList(getWorkflowName(Tag.RETENTION, 
entity).toString(),
-                getWorkflowName(Tag.REPLICATION, entity).toString());
-
-        case PROCESS:
-            return Arrays.asList(getWorkflowName(Tag.DEFAULT, 
entity).toString());
-
-        default:
-        }
-        throw new IllegalArgumentException("Unhandled type: " + 
entity.getEntityType());
-    }
-
-    public static <T extends Entity> T getClusterView(T entity, String 
clusterName) {
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            return entity;
-
-        case FEED:
-            Feed feed = (Feed) entity.copy();
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = 
FeedHelper.getCluster(feed, clusterName);
-            Iterator<org.apache.falcon.entity.v0.feed.Cluster> itr = 
feed.getClusters().getClusters().iterator();
-            while (itr.hasNext()) {
-                org.apache.falcon.entity.v0.feed.Cluster cluster = itr.next();
-                //In addition to retaining the required clster, retain the 
sources clusters if this is the target
-                // cluster
-                //1. Retain cluster if cluster n
-                if (!(cluster.getName().equals(clusterName)
-                        || (feedCluster.getType() == ClusterType.TARGET
-                        && cluster.getType() == ClusterType.SOURCE))) {
-                    itr.remove();
-                }
-            }
-            return (T) feed;
-
-        case PROCESS:
-            Process process = (Process) entity.copy();
-            Iterator<org.apache.falcon.entity.v0.process.Cluster> procItr =
-                process.getClusters().getClusters().iterator();
-            while (procItr.hasNext()) {
-                org.apache.falcon.entity.v0.process.Cluster cluster = 
procItr.next();
-                if (!cluster.getName().equals(clusterName)) {
-                    procItr.remove();
-                }
-            }
-            return (T) process;
-        default:
-        }
-        throw new UnsupportedOperationException("Not supported for entity type 
" + entity.getEntityType());
-    }
-
-    public static Set<String> getClustersDefined(Entity entity) {
-        Set<String> clusters = new HashSet<String>();
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            clusters.add(entity.getName());
-            break;
-
-        case FEED:
-            Feed feed = (Feed) entity;
-            for (org.apache.falcon.entity.v0.feed.Cluster cluster : 
feed.getClusters().getClusters()) {
-                clusters.add(cluster.getName());
-            }
-            break;
-
-        case PROCESS:
-            Process process = (Process) entity;
-            for (org.apache.falcon.entity.v0.process.Cluster cluster : 
process.getClusters().getClusters()) {
-                clusters.add(cluster.getName());
-            }
-            break;
-        default:
-        }
-        return clusters;
-    }
-
-    public static Set<String> getClustersDefinedInColos(Entity entity) {
-        Set<String> entityClusters = EntityUtil.getClustersDefined(entity);
-        if (DeploymentUtil.isEmbeddedMode()) {
-            return entityClusters;
-        }
-
-        Set<String> myClusters = DeploymentUtil.getCurrentClusters();
-        Set<String> applicableClusters = new HashSet<String>();
-        for (String cluster : entityClusters) {
-            if (myClusters.contains(cluster)) {
-                applicableClusters.add(cluster);
-            }
-        }
-        return applicableClusters;
-    }
-
-    public static Retry getRetry(Entity entity) throws FalconException {
-        switch (entity.getEntityType()) {
-        case FEED:
-            if (!RuntimeProperties.get()
-                    .getProperty("feed.retry.allowed", "true")
-                    .equalsIgnoreCase("true")) {
-                return null;
-            }
-            Retry retry = new Retry();
-            retry.setAttempts(Integer.parseInt(RuntimeProperties.get()
-                    .getProperty("feed.retry.attempts", "3")));
-            retry.setDelay(new Frequency(RuntimeProperties.get().getProperty(
-                    "feed.retry.frequency", "minutes(5)")));
-            retry.setPolicy(PolicyType.fromValue(RuntimeProperties.get()
-                    .getProperty("feed.retry.policy", "exp-backoff")));
-            
retry.setOnTimeout(Boolean.valueOf(RuntimeProperties.get().getProperty("feed.retry.onTimeout",
 "false")));
-            return retry;
-        case PROCESS:
-            Process process = (Process) entity;
-            return process.getRetry();
-        default:
-            throw new FalconException("Cannot create Retry for entity:" + 
entity.getName());
-        }
-    }
-
-    //Staging path that stores scheduler configs like oozie coord/bundle xmls, 
parent workflow xml
-    //Each entity update creates a new staging path
-    //Base staging path is the base path for all staging dirs
-    public static Path getBaseStagingPath(Cluster cluster, Entity entity) {
-        return new Path(ClusterHelper.getLocation(cluster, 
ClusterLocationType.STAGING).getPath(),
-                "falcon/workflows/" + 
entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
-    }
-
-    /**
-     * Gets the latest staging path for an entity on a cluster, based on the 
dir name(that contains timestamp).
-     * @param cluster
-     * @param entity
-     * @return
-     * @throws FalconException
-     */
-    public static Path 
getLatestStagingPath(org.apache.falcon.entity.v0.cluster.Cluster cluster, final 
Entity entity)
-        throws FalconException {
-        Path basePath = getBaseStagingPath(cluster, entity);
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-        try {
-            final String md5 = md5(getClusterView(entity, cluster.getName()));
-            FileStatus[] files = fs.listStatus(basePath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    return path.getName().startsWith(md5);
-                }
-            });
-            if (files != null && files.length != 0) {
-                // Find the latest directory using the timestamp used in the 
dir name
-                // These files will vary only in ts suffix (as we have 
filtered out using a common md5 hash),
-                // hence, sorting will be on timestamp.
-                // FileStatus compares on Path and hence the latest will be at 
the end after sorting.
-                Arrays.sort(files);
-                return files[files.length - 1].getPath();
-            }
-            throw new FalconException("No staging directories found for entity 
" + entity.getName() + " on cluster "
-                + cluster.getName());
-        } catch (Exception e) {
-            throw new FalconException("Unable get listing for " + 
basePath.toString(), e);
-        }
-    }
-
-    //Creates new staging path for entity schedule/update
-    //Staging path containd md5 of the cluster view of the entity. This is 
required to check if update is required
-    public static Path getNewStagingPath(Cluster cluster, Entity entity)
-        throws FalconException {
-        Entity clusterView = getClusterView(entity, cluster.getName());
-        return new Path(getBaseStagingPath(cluster, entity),
-            md5(clusterView) + STAGING_DIR_NAME_SEPARATOR + 
String.valueOf(System.currentTimeMillis()));
-    }
-
-    // Given an entity and a cluster, determines if the supplied path is the 
staging path for that entity.
-    public static boolean isStagingPath(Cluster cluster,
-                                        Entity entity, Path path) throws 
FalconException {
-        String basePath = new Path(ClusterHelper.getLocation(cluster, 
ClusterLocationType.STAGING)
-                .getPath()).toUri().getPath();
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                    ClusterHelper.getConfiguration(cluster));
-            String pathString = path.toUri().getPath();
-            String entityPath = entity.getEntityType().name().toLowerCase() + 
"/" + entity.getName();
-            return fs.exists(path) && pathString.startsWith(basePath) && 
pathString.contains(entityPath);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
-    public static LateProcess getLateProcess(Entity entity)
-        throws FalconException {
-
-        switch (entity.getEntityType()) {
-        case FEED:
-            if (!RuntimeProperties.get().getProperty("feed.late.allowed", 
"true").equalsIgnoreCase("true")) {
-                return null;
-            }
-
-            //If late Arrival is not configured do not process further
-            if (((Feed) entity).getLateArrival() == null){
-                return null;
-            }
-
-            LateProcess lateProcess = new LateProcess();
-            lateProcess.setDelay(new 
Frequency(RuntimeProperties.get().getProperty("feed.late.frequency", 
"hours(3)")));
-            lateProcess.setPolicy(
-                    
PolicyType.fromValue(RuntimeProperties.get().getProperty("feed.late.policy", 
"exp-backoff")));
-            LateInput lateInput = new LateInput();
-            lateInput.setInput(entity.getName());
-            //TODO - Assuming the late workflow is not used
-            lateInput.setWorkflowPath("ignore.xml");
-            lateProcess.getLateInputs().add(lateInput);
-            return lateProcess;
-        case PROCESS:
-            Process process = (Process) entity;
-            return process.getLateProcess();
-        default:
-            throw new FalconException("Cannot create Late Process for entity:" 
+ entity.getName());
-        }
-    }
-
-    public static Path getLogPath(Cluster cluster, Entity entity) {
-        return new Path(getBaseStagingPath(cluster, entity), "logs");
-    }
-
-    public static String fromUTCtoURIDate(String utc) throws FalconException {
-        DateFormat utcFormat = new SimpleDateFormat(
-                "yyyy'-'MM'-'dd'T'HH':'mm'Z'");
-        Date utcDate;
-        try {
-            utcDate = utcFormat.parse(utc);
-        } catch (ParseException e) {
-            throw new FalconException("Unable to parse utc date:", e);
-        }
-        DateFormat uriFormat = new 
SimpleDateFormat("yyyy'-'MM'-'dd'-'HH'-'mm");
-        return uriFormat.format(utcDate);
-    }
-
-    public static boolean responsibleFor(String colo) {
-        return DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
-                && colo.equals(DeploymentUtil.getCurrentColo()));
-    }
-
-    public static Date getNextStartTime(Entity entity, Cluster cluster, Date 
effectiveTime) {
-        switch(entity.getEntityType()) {
-        case FEED:
-            Feed feed = (Feed) entity;
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = 
FeedHelper.getCluster(feed, cluster.getName());
-            return getNextStartTime(feedCluster.getValidity().getStart(), 
feed.getFrequency(), feed.getTimezone(),
-                effectiveTime);
-
-        case PROCESS:
-            Process process = (Process) entity;
-            org.apache.falcon.entity.v0.process.Cluster processCluster = 
ProcessHelper.getCluster(process,
-                cluster.getName());
-            return getNextStartTime(processCluster.getValidity().getStart(), 
process.getFrequency(),
-                process.getTimezone(), effectiveTime);
-
-        default:
-        }
-
-        throw new IllegalArgumentException("Unhandled type: " + 
entity.getEntityType());
-    }
-
-    public static boolean isTableStorageType(Cluster cluster, Entity entity) 
throws FalconException {
-        return entity.getEntityType() == EntityType.PROCESS
-            ? isTableStorageType(cluster, (Process) entity) : 
isTableStorageType(cluster, (Feed) entity);
-    }
-
-    public static boolean isTableStorageType(Cluster cluster, Feed feed) 
throws FalconException {
-        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
-        return Storage.TYPE.TABLE == storageType;
-    }
-
-    public static boolean isTableStorageType(Cluster cluster, Process process) 
throws FalconException {
-        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, 
process);
-        return Storage.TYPE.TABLE == storageType;
-    }
-
-    public static List<String> getTags(Entity entity) {
-        String rawTags = null;
-
-        switch (entity.getEntityType()) {
-        case PROCESS:
-            rawTags = ((Process) entity).getTags();
-            break;
-
-        case FEED:
-            rawTags = ((Feed) entity).getTags();
-            break;
-
-        case CLUSTER:
-            rawTags = ((Cluster) entity).getTags();
-            break;
-
-        default:
-            break;
-        }
-
-        List<String> tags = new ArrayList<String>();
-        if (!StringUtils.isEmpty(rawTags)) {
-            for(String tag : rawTags.split(",")) {
-                tags.add(tag.trim());
-            }
-        }
-
-        return tags;
-    }
-
-    public static List<String> getPipelines(Entity entity) {
-        List<String> pipelines = new ArrayList<String>();
-
-        if (entity.getEntityType().equals(EntityType.PROCESS)) {
-            Process process = (Process) entity;
-            String pipelineString = process.getPipelines();
-            if (pipelineString != null) {
-                for (String pipeline : pipelineString.split(",")) {
-                    pipelines.add(pipeline.trim());
-                }
-            }
-        } // else : Pipelines are only set for Process entities
-
-        return pipelines;
-    }
-
-    public static EntityList getEntityDependencies(Entity entity) throws 
FalconException {
-        Set<Entity> dependents = EntityGraph.get().getDependents(entity);
-        Entity[] dependentEntities = dependents.toArray(new 
Entity[dependents.size()]);
-        return new EntityList(dependentEntities, entity);
-    }
-
-    public static Pair<Date, Date> getEntityStartEndDates(Entity entityObject) 
{
-        Set<String> clusters = EntityUtil.getClustersDefined(entityObject);
-        Pair<Date, String> clusterMinStartDate = null;
-        Pair<Date, String> clusterMaxEndDate = null;
-        for (String cluster : clusters) {
-            if (clusterMinStartDate == null || 
clusterMinStartDate.first.after(getStartTime(entityObject, cluster))) {
-                clusterMinStartDate = Pair.of(getStartTime(entityObject, 
cluster), cluster);
-            }
-            if (clusterMaxEndDate == null || 
clusterMaxEndDate.first.before(getEndTime(entityObject, cluster))) {
-                clusterMaxEndDate = Pair.of(getEndTime(entityObject, cluster), 
cluster);
-            }
-        }
-        return new Pair<Date, Date>(clusterMinStartDate.first, 
clusterMaxEndDate.first);
-    }
-
-    /**
-     * Returns the previous instance(before or on) for a given referenceTime
-     *
-     * Example: For a feed in "UTC" with startDate "2014-01-01 00:00" and 
frequency of  "days(1)" a referenceTime
-     * of "2015-01-01 00:00" will return "2015-01-01 00:00".
-     *
-     * Similarly for the above feed if we give a reference Time of "2015-01-01 
04:00" will also result in
-     * "2015-01-01 00:00"
-     *
-     * @param startTime start time of the entity
-     * @param frequency frequency of the entity
-     * @param tz timezone of the entity
-     * @param referenceTime time before which the instanceTime is desired
-     * @return  instance(before or on) the referenceTime
-     */
-    public static Date getPreviousInstanceTime(Date startTime, Frequency 
frequency, TimeZone tz, Date referenceTime) {
-        if (tz == null) {
-            tz = TimeZone.getTimeZone("UTC");
-        }
-        Calendar insCal = Calendar.getInstance(tz);
-        insCal.setTime(startTime);
-
-        int instanceCount = getInstanceSequence(startTime, frequency, tz, 
referenceTime) - 1;
-        final int freq = frequency.getFrequencyAsInt() * instanceCount;
-        insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
-
-        while (insCal.getTime().after(referenceTime)) {
-            insCal.add(frequency.getTimeUnit().getCalendarUnit(), 
frequency.getFrequencyAsInt() * -1);
-        }
-        return insCal.getTime();
-    }
-
-    /**
-     * Find the times at which the given entity will run in a given time range.
-     * <p/>
-     * Both start and end Date are inclusive.
-     *
-     * @param entity      feed or process entity whose instance times are to 
be found
-     * @param clusterName name of the cluster
-     * @param startRange  start time for the input range
-     * @param endRange    end time for the input range
-     * @return List of instance times at which the entity will run in the 
given time range
-     */
-    public static List<Date> getEntityInstanceTimes(Entity entity, String 
clusterName, Date startRange, Date endRange) {
-        Date start = null;
-        switch (entity.getEntityType()) {
-
-        case FEED:
-            Feed feed = (Feed) entity;
-            start = FeedHelper.getCluster(feed, 
clusterName).getValidity().getStart();
-            return getInstanceTimes(start, feed.getFrequency(), 
feed.getTimezone(),
-                    startRange, endRange);
-
-        case PROCESS:
-            Process process = (Process) entity;
-            start = ProcessHelper.getCluster(process, 
clusterName).getValidity().getStart();
-            return getInstanceTimes(start, process.getFrequency(),
-                    process.getTimezone(), startRange, endRange);
-
-        default:
-            throw new IllegalArgumentException("Unhandled type: " + 
entity.getEntityType());
-        }
-    }
-
-
-    /**
-     * Find instance times given first instance start time and frequency till 
a given end time.
-     *
-     * It finds the first valid instance time for the given time range, it 
then uses frequency to find next instances
-     * in the given time range.
-     *
-     * @param startTime startTime of the entity (time of first instance ever 
of the given entity)
-     * @param frequency frequency of the entity
-     * @param timeZone  timeZone of the entity
-     * @param startRange start time for the input range of interest.
-     * @param endRange end time for the input range of interest.
-     * @return list of instance run times of the given entity in the given 
time range.
-     */
-    public static List<Date> getInstanceTimes(Date startTime, Frequency 
frequency, TimeZone timeZone,
-                                              Date startRange, Date endRange) {
-        List<Date> result = new LinkedList<>();
-        if (timeZone == null) {
-            timeZone = TimeZone.getTimeZone("UTC");
-        }
-
-        Date current = getPreviousInstanceTime(startTime, frequency, timeZone, 
startRange);
-        while (true) {
-            Date nextStartTime = getNextStartTime(startTime, frequency, 
timeZone, current);
-            if (nextStartTime.after(endRange)){
-                break;
-            }
-            result.add(nextStartTime);
-            // this is required because getNextStartTime returns greater than 
or equal to referenceTime
-            current = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli 
seconds later
-        }
-        return result;
-    }
-
-    /**
-     * Returns Data Source Type given a feed with Import policy.
-     *
-     * @param cluster
-     * @param feed
-     * @return
-     * @throws FalconException
-     */
-
-    public static DatasourceType getImportDatasourceType(
-            Cluster cluster, Feed feed) throws FalconException {
-        return FeedHelper.getImportDatasourceType(cluster, feed);
-    }
-
-    /**
-     * Returns Data Source Type given a feed with Export policy.
-     *
-     * @param cluster
-     * @param feed
-     * @return
-     * @throws FalconException
-     */
-
-    public static DatasourceType getExportDatasourceType(
-            Cluster cluster, Feed feed) throws FalconException {
-        return FeedHelper.getExportDatasourceType(cluster, feed);
-    }
-
-    public static EntityNotification getEntityNotification(Entity entity) {
-        switch (entity.getEntityType()) {
-        case FEED:
-            Feed feed = (Feed) entity;
-            return feed.getNotification();
-        case PROCESS:
-            Process process = (Process) entity;
-            return process.getNotification();
-
-        default:
-            throw new IllegalArgumentException("Unhandled type: " + 
entity.getEntityType());
-        }
-    }
-
-
-    /**
-     * @param properties - String of format key1:value1, key2:value2
-     * @return
-     */
-    public static Map<String, String> getPropertyMap(String properties) {
-        Map<String, String> props = new HashMap<>();
-        if (StringUtils.isNotEmpty(properties)) {
-            String[] kvPairs = properties.split(",");
-            for (String kvPair : kvPairs) {
-                String[] keyValue = kvPair.trim().split(":", 2);
-                if (keyValue.length == 2 && !keyValue[0].trim().isEmpty() && 
!keyValue[1].trim().isEmpty()) {
-                    props.put(keyValue[0].trim(), keyValue[1].trim());
-                } else {
-                    throw new IllegalArgumentException("Found invalid property 
" + keyValue[0]
-                            + ". Schedule properties must be comma separated 
key-value pairs. "
-                            + " Example: key1:value1,key2:value2");
-                }
-            }
-        }
-        return props;
-    }
-
-    public static JOBPRIORITY getPriority(Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProps = 
process.getProperties();
-        if (processProps != null) {
-            for (org.apache.falcon.entity.v0.process.Property prop : 
processProps.getProperties()) {
-                if (prop.getName().equals(MR_JOB_PRIORITY)) {
-                    return JOBPRIORITY.valueOf(prop.getValue());
-                }
-            }
-        }
-        return JOBPRIORITY.NORMAL;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/ExternalId.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ExternalId.java 
b/common/src/main/java/org/apache/falcon/entity/ExternalId.java
deleted file mode 100644
index 688d5a6..0000000
--- a/common/src/main/java/org/apache/falcon/entity/ExternalId.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.v0.SchemaHelper;
-
-import java.util.Date;
-
-/**
- * External id as represented by workflow engine.
- */
-public class ExternalId {
-    private static final String SEPARATOR = "/";
-    private String id;
-
-    public ExternalId(String id) {
-        this.id = id;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public ExternalId(String name, Tag tag, String elexpr) {
-        if (StringUtils.isEmpty(name) || tag == null || 
StringUtils.isEmpty(elexpr)) {
-            throw new IllegalArgumentException("Empty inputs!");
-        }
-
-        id = name + SEPARATOR + tag.name() + SEPARATOR + elexpr;
-    }
-
-    public ExternalId(String name, Tag tag, Date date) {
-        this(name, tag, SchemaHelper.formatDateUTC(date));
-    }
-
-    public String getName() {
-        String[] parts = id.split(SEPARATOR);
-        return parts[0];
-    }
-
-    public Date getDate() throws FalconException {
-        return EntityUtil.parseDateUTC(getDateAsString());
-    }
-
-    public String getDateAsString() {
-        String[] parts = id.split(SEPARATOR);
-        return parts[2];
-    }
-
-    public Tag getTag() {
-        String[] parts = id.split(SEPARATOR);
-        return Tag.valueOf(parts[1]);
-    }
-
-    public String getDFSname() {
-        return id.replace(":", "-");
-    }
-}

Reply via email to