http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
 
b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
deleted file mode 100644
index 887164e..0000000
--- 
a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang.Validate;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Default implementation of AuthorizationProvider in Falcon.
- *
- * The authorization is enforced in the following way:
- *
- * if admin resource,
- *      if authenticated user name matches the admin users configuration
- *      Else if groups of the authenticated user matches the admin groups 
configuration
- * Else if entities or instance resource
- *      if the authenticated user matches the owner in ACL for the entity
- *      Else if the groups of the authenticated user matches the group in ACL 
for the entity
- * Else if lineage resource
- *      All have read-only permissions
- * Else bad resource
- */
-public class DefaultAuthorizationProvider implements AuthorizationProvider {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultAuthorizationProvider.class);
-
-    private static final Set<String> RESOURCES = new HashSet<String>(
-            Arrays.asList(new String[]{"admin", "entities", "instance", 
"metadata", }));
-
-    /**
-     * Constant for the configuration property that indicates the prefix.
-     */
-    protected static final String FALCON_PREFIX = 
"falcon.security.authorization.";
-
-    /**
-     * Constant for the configuration property that indicates the blacklisted 
super users for falcon.
-     */
-    private static final String ADMIN_USERS_KEY = FALCON_PREFIX + 
"admin.users";
-    private static final String ADMIN_GROUPS_KEY = FALCON_PREFIX + 
"admin.groups";
-
-    /**
-     * The super-user is the user with the same identity as falcon process 
itself.
-     * Loosely, if you started falcon, then you are the super-user.
-     */
-    protected static final String SUPER_USER = System.getProperty("user.name");
-
-    /**
-     * Constant for the configuration property that indicates the super user 
group.
-     */
-    private static final String SUPER_USER_GROUP_KEY = FALCON_PREFIX + 
"superusergroup";
-
-    /**
-     * Super user group.
-     */
-    private final String superUserGroup;
-    private final Set<String> adminUsers;
-    private final Set<String> adminGroups;
-
-    public DefaultAuthorizationProvider() {
-        superUserGroup = 
StartupProperties.get().getProperty(SUPER_USER_GROUP_KEY);
-        adminUsers = getAdminNamesFromConfig(ADMIN_USERS_KEY);
-        adminGroups = getAdminNamesFromConfig(ADMIN_GROUPS_KEY);
-    }
-
-    private Set<String> getAdminNamesFromConfig(String key) {
-        Set<String> adminNames = new HashSet<String>();
-        String adminNamesConfig = StartupProperties.get().getProperty(key);
-        if (!StringUtils.isEmpty(adminNamesConfig)) {
-            adminNames.addAll(Arrays.asList(adminNamesConfig.split(",")));
-        }
-
-        return Collections.unmodifiableSet(adminNames);
-    }
-
-    /**
-     * Determines if the authenticated user is the user who started this 
process
-     * or belongs to the super user group.
-     *
-     * @param authenticatedUGI UGI
-     * @return true if super user else false.
-     */
-    public boolean isSuperUser(UserGroupInformation authenticatedUGI) {
-        return SUPER_USER.equals(authenticatedUGI.getShortUserName())
-            || (!StringUtils.isEmpty(superUserGroup)
-                    && isUserInGroup(superUserGroup, authenticatedUGI));
-    }
-
-    /**
-     * Checks if authenticated user should proxy the entity acl owner.
-     *
-     * @param authenticatedUGI  proxy ugi for the authenticated user.
-     * @param aclOwner          entity ACL Owner.
-     * @param aclGroup          entity ACL group.
-     * @throws IOException
-     */
-    @Override
-    public boolean shouldProxy(UserGroupInformation authenticatedUGI,
-                               final String aclOwner,
-                               final String aclGroup) throws IOException {
-        Validate.notNull(authenticatedUGI, "User cannot be empty or null");
-        Validate.notEmpty(aclOwner, "User cannot be empty or null");
-        Validate.notEmpty(aclGroup, "Group cannot be empty or null");
-
-        return isSuperUser(authenticatedUGI)
-            || (!isUserACLOwner(authenticatedUGI.getShortUserName(), aclOwner)
-                    && isUserInGroup(aclGroup, authenticatedUGI));
-    }
-
-    /**
-     * Determines if the authenticated user is authorized to execute the 
action on the resource.
-     * Throws an exception if not authorized.
-     *
-     * @param resource   api resource, admin, entities or instance
-     * @param action     action being authorized on resource and entity if 
applicable
-     * @param entityType entity type in question, not for admin resource
-     * @param entityName entity name in question, not for admin resource
-     * @param authenticatedUGI   proxy ugi for the authenticated user
-     * @throws org.apache.hadoop.security.authorize.AuthorizationException
-     */
-    @Override
-    public void authorizeResource(String resource, String action,
-                                  String entityType, String entityName,
-                                  UserGroupInformation authenticatedUGI)
-        throws AuthorizationException, EntityNotRegisteredException {
-
-        Validate.notEmpty(resource, "Resource cannot be empty or null");
-        Validate.isTrue(RESOURCES.contains(resource), "Illegal resource: " + 
resource);
-        Validate.notEmpty(action, "Action cannot be empty or null");
-
-        try {
-            if (isSuperUser(authenticatedUGI)) {
-                return;
-            }
-
-            if ("admin".equals(resource)) {
-                if (!("version".equals(action) || "clearuser".equals(action) 
|| "getuser".equals(action))) {
-                    authorizeAdminResource(authenticatedUGI, action);
-                }
-            } else if ("entities".equals(resource) || 
"instance".equals(resource)) {
-                authorizeEntityResource(authenticatedUGI, entityName, 
entityType, action);
-            } else if ("metadata".equals(resource)) {
-                authorizeMetadataResource(authenticatedUGI, action);
-            }
-        } catch (IOException e) {
-            throw new AuthorizationException(e);
-        }
-    }
-
-    protected Set<String> getGroupNames(UserGroupInformation proxyUgi) {
-        return new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames()));
-    }
-
-    /**
-     * Determines if the authenticated user is authorized to execute the 
action on the entity.
-     * Throws an exception if not authorized.
-     *
-     * @param entityName entity in question, applicable for entities and 
instance resource
-     * @param entityType entity in question, applicable for entities and 
instance resource
-     * @param acl        entity ACL
-     * @param action     action being authorized on resource and entity if 
applicable
-     * @param authenticatedUGI   proxy ugi for the authenticated user
-     * @throws org.apache.hadoop.security.authorize.AuthorizationException
-     */
-    @Override
-    public void authorizeEntity(String entityName, String entityType, 
AccessControlList acl,
-                                String action, UserGroupInformation 
authenticatedUGI)
-        throws AuthorizationException {
-
-        try {
-            LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, 
type{}",
-                    authenticatedUGI.getShortUserName(), action, entityName, 
entityType);
-
-            if (isSuperUser(authenticatedUGI)) {
-                return;
-            }
-
-            checkUser(entityName, acl.getOwner(), acl.getGroup(), action, 
authenticatedUGI);
-        } catch (IOException e) {
-            throw new AuthorizationException(e);
-        }
-    }
-
-    /**
-     * Validate if the entity owner is the logged-in authenticated user.
-     *
-     * @param entityName        entity name.
-     * @param aclOwner          entity ACL Owner.
-     * @param aclGroup          entity ACL group.
-     * @param action            action being authorized on resource and entity 
if applicable.
-     * @param authenticatedUGI          proxy ugi for the authenticated user.
-     * @throws AuthorizationException
-     */
-    protected void checkUser(String entityName, String aclOwner, String 
aclGroup, String action,
-                             UserGroupInformation authenticatedUGI) throws 
AuthorizationException {
-        final String authenticatedUser = authenticatedUGI.getShortUserName();
-        if (isUserACLOwner(authenticatedUser, aclOwner)
-                || isUserInGroup(aclGroup, authenticatedUGI)) {
-            return;
-        }
-
-        StringBuilder message = new StringBuilder("Permission denied: 
authenticatedUser=");
-        message.append(authenticatedUser);
-        message.append(!authenticatedUser.equals(aclOwner)
-                ? " not entity owner=" + aclOwner
-                : " not in group=" + aclGroup);
-        message.append(", entity=").append(entityName).append(", 
action=").append(action);
-
-        LOG.error(message.toString());
-        throw new AuthorizationException(message.toString());
-    }
-
-    /**
-     * Determines if the authenticated user is the entity ACL owner.
-     *
-     * @param authenticatedUser authenticated user
-     * @param aclOwner          entity ACL owner
-     * @return true if authenticated user is the entity acl owner, false 
otherwise.
-     */
-    protected boolean isUserACLOwner(String authenticatedUser, String 
aclOwner) {
-        return authenticatedUser.equals(aclOwner);
-    }
-
-    /**
-     * Checks if the user's group matches the entity ACL group.
-     *
-     * @param group    Entity ACL group.
-     * @param proxyUgi proxy ugi for the authenticated user.
-     * @return true if user groups contains entity acl group.
-     */
-    protected boolean isUserInGroup(String group, UserGroupInformation 
proxyUgi) {
-        Set<String> groups = getGroupNames(proxyUgi);
-        return groups.contains(group);
-    }
-
-    /**
-     * Check if the user has admin privileges.
-     *
-     * @param authenticatedUGI proxy ugi for the authenticated user.
-     * @param action   admin action on the resource.
-     * @throws AuthorizationException if the user does not have admin 
privileges.
-     */
-    protected void authorizeAdminResource(UserGroupInformation 
authenticatedUGI,
-                                          String action) throws 
AuthorizationException {
-        final String authenticatedUser = authenticatedUGI.getShortUserName();
-        LOG.debug("Authorizing user={} for admin, action={}", 
authenticatedUser, action);
-        if (adminUsers.contains(authenticatedUser) || 
isUserInAdminGroups(authenticatedUGI)) {
-            return;
-        }
-
-        LOG.error("Permission denied: user {} does not have admin privilege 
for action={}",
-                authenticatedUser, action);
-        throw new AuthorizationException("Permission denied: user=" + 
authenticatedUser
-                + " does not have admin privilege for action=" + action);
-    }
-
-    protected boolean isUserInAdminGroups(UserGroupInformation proxyUgi) {
-        final Set<String> groups = getGroupNames(proxyUgi);
-        groups.retainAll(adminGroups);
-        return !groups.isEmpty();
-    }
-
-    protected void authorizeEntityResource(UserGroupInformation 
authenticatedUGI,
-                                           String entityName, String 
entityType,
-                                           String action)
-        throws AuthorizationException, EntityNotRegisteredException {
-
-        Validate.notEmpty(entityType, "Entity type cannot be empty or null");
-        LOG.debug("Authorizing authenticatedUser={} against entity/instance 
action={}, "
-                + "entity name={}, entity type={}",
-                authenticatedUGI.getShortUserName(), action, entityName, 
entityType);
-
-        if (entityName != null) { // lifecycle actions
-            Entity entity = getEntity(entityName, entityType);
-            authorizeEntity(entity.getName(), entity.getEntityType().name(),
-                entity.getACL(), action, authenticatedUGI);
-        } else {
-            // non lifecycle actions, lifecycle actions with null entity will 
validate later
-            LOG.info("Authorization for action={} will be done in the API", 
action);
-        }
-    }
-
-    private Entity getEntity(String entityName, String entityType)
-        throws EntityNotRegisteredException, AuthorizationException {
-
-        try {
-            EntityType type = EntityType.getEnum(entityType);
-            return EntityUtil.getEntity(type, entityName);
-        } catch (FalconException e) {
-            if (e instanceof EntityNotRegisteredException) {
-                throw (EntityNotRegisteredException) e;
-            } else {
-                throw new AuthorizationException(e);
-            }
-        }
-    }
-
-    protected void authorizeMetadataResource(UserGroupInformation 
authenticatedUGI,
-                                             String action) throws 
AuthorizationException {
-        LOG.debug("User {} authorized for action {} ", 
authenticatedUGI.getShortUserName(), action);
-        // todo - read-only for all metadata but needs to be implemented
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java 
b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
deleted file mode 100644
index c187358..0000000
--- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.security;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import 
org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
-import 
org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * Security Util - bunch of security related helper methods.
- */
-public final class SecurityUtil {
-
-    /**
-     * Constant for the configuration property that indicates the prefix.
-     */
-    private static final String CONFIG_PREFIX = "falcon.authentication.";
-
-    /**
-     * Constant for the configuration property that indicates the 
authentication type.
-     */
-    public static final String AUTHENTICATION_TYPE = CONFIG_PREFIX + "type";
-
-    /**
-     * Constant for the configuration property that indicates the Name node 
principal.
-     */
-    public static final String NN_PRINCIPAL = 
"dfs.namenode.kerberos.principal";
-
-    /**
-     * Constant for the configuration property that indicates the Name node 
principal.
-     * This is used to talk to Hive Meta Store during parsing and validations 
only.
-     */
-    public static final String HIVE_METASTORE_KERBEROS_PRINCIPAL = 
"hive.metastore.kerberos.principal";
-
-    public static final String METASTORE_USE_THRIFT_SASL = 
"hive.metastore.sasl.enabled";
-
-    public static final String METASTORE_PRINCIPAL = 
"hcat.metastore.principal";
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SecurityUtil.class);
-
-    private SecurityUtil() {
-    }
-
-    public static String getAuthenticationType() {
-        return StartupProperties.get().getProperty(
-                AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
-    }
-
-    /**
-     * Checks if kerberos authentication is enabled in the configuration.
-     *
-     * @return true if falcon.authentication.type is kerberos, false otherwise
-     */
-    public static boolean isSecurityEnabled() {
-        String authenticationType = StartupProperties.get().getProperty(
-                AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE);
-
-        final boolean useKerberos;
-        if (authenticationType == null || 
PseudoAuthenticationHandler.TYPE.equals(authenticationType)) {
-            useKerberos = false;
-        } else if 
(KerberosAuthenticationHandler.TYPE.equals(authenticationType)) {
-            useKerberos = true;
-        } else {
-            throw new IllegalArgumentException("Invalid attribute value for "
-                    + AUTHENTICATION_TYPE + " of " + authenticationType);
-        }
-
-        return useKerberos;
-    }
-
-    public static String getLocalHostName() throws UnknownHostException {
-        return InetAddress.getLocalHost().getCanonicalHostName();
-    }
-
-    /**
-     * Checks if authorization is enabled in the configuration.
-     *
-     * @return true if falcon.security.authorization.enabled is enabled, false 
otherwise
-     */
-    public static boolean isAuthorizationEnabled() {
-        return Boolean.valueOf(StartupProperties.get().getProperty(
-                "falcon.security.authorization.enabled", "false"));
-    }
-
-    public static AuthorizationProvider getAuthorizationProvider() throws 
FalconException {
-        String providerClassName = StartupProperties.get().getProperty(
-                "falcon.security.authorization.provider",
-                "org.apache.falcon.security.DefaultAuthorizationProvider");
-        return ReflectionUtils.getInstanceByClassName(providerClassName);
-    }
-
-    public static void tryProxy(Entity entity, final String doAsUser) throws 
IOException, FalconException {
-        if (entity != null && entity.getACL() != null && 
SecurityUtil.isAuthorizationEnabled()) {
-            final String aclOwner = entity.getACL().getOwner();
-            final String aclGroup = entity.getACL().getGroup();
-
-            if (StringUtils.isNotEmpty(doAsUser)) {
-                if (!doAsUser.equalsIgnoreCase(aclOwner)) {
-                    LOG.warn("doAs user {} not same as acl owner {}. Ignoring 
acl owner.", doAsUser, aclOwner);
-                    throw new FalconException("doAs user and ACL owner 
mismatch. doAs user " + doAsUser
-                            +  " should be same as ACL owner " + aclOwner);
-                }
-                return;
-            }
-            if (SecurityUtil.getAuthorizationProvider().shouldProxy(
-                    CurrentUser.getAuthenticatedUGI(), aclOwner, aclGroup)) {
-                CurrentUser.proxy(aclOwner, aclGroup);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
 
b/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
deleted file mode 100644
index e20b0b5..0000000
--- 
a/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-
-/**
- * Configuration change notification listener.
- */
-public interface ConfigurationChangeListener {
-
-    /**
-     * This is upon adding a new entity to Store.
-     *
-     * @param entity entity object
-     * @throws FalconException
-     */
-    void onAdd(Entity entity) throws FalconException;
-
-    /**
-     * This is upon removing an existing entity from the Store.
-     *
-     * @param entity entity object
-     * @throws FalconException
-     */
-    void onRemove(Entity entity) throws FalconException;
-
-    /**
-     * This is upon updating an entity to the store.
-     *
-     * @param oldEntity old entity object
-     * @param newEntity updated entity object
-     * @throws FalconException
-     */
-    void onChange(Entity oldEntity, Entity newEntity) throws FalconException;
-
-    /**
-     * This is when existing entities are read from the store during startup.
-     *
-     * @param entity entity object
-     * @throws FalconException
-     */
-    void onReload(Entity entity) throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/FalconService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/FalconService.java 
b/common/src/main/java/org/apache/falcon/service/FalconService.java
deleted file mode 100644
index a1eb8e0..0000000
--- a/common/src/main/java/org/apache/falcon/service/FalconService.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Falcon service initialized at startup.
- */
-public interface FalconService {
-
-    String getName();
-
-    void init() throws FalconException;
-
-    void destroy() throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/GroupsService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/GroupsService.java 
b/common/src/main/java/org/apache/falcon/service/GroupsService.java
deleted file mode 100644
index dd4d946..0000000
--- a/common/src/main/java/org/apache/falcon/service/GroupsService.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Groups;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The GroupsService class delegates to the Hadoop's 
<code>org.apache.hadoop.security.Groups</code>
- * to retrieve the groups a user belongs to.
- */
-public class GroupsService implements FalconService {
-    private org.apache.hadoop.security.Groups hGroups;
-
-    public static final String SERVICE_NAME = 
GroupsService.class.getSimpleName();
-
-    /**
-     * Initializes the service.
-     */
-    @Override
-    public void init() {
-        hGroups = new Groups(new Configuration(true));
-    }
-
-    /**
-     * Destroys the service.
-     */
-    @Override
-    public void destroy() {
-    }
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    /**
-     * Returns the list of groups a user belongs to.
-     *
-     * @param user user name.
-     * @return the groups the given user belongs to.
-     * @throws java.io.IOException thrown if there was an error retrieving the 
groups of the user.
-     */
-    public List<String> getGroups(String user) throws IOException {
-        return hGroups.getGroups(user);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java 
b/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
deleted file mode 100644
index b8c979e..0000000
--- a/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.lifecycle.FeedLifecycleStage;
-import org.apache.falcon.lifecycle.LifecyclePolicy;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Stores all internal and external feed lifecycle policies.
- */
-public final class LifecyclePolicyMap implements FalconService {
-    private static final Logger LOG = 
LoggerFactory.getLogger(LifecyclePolicyMap.class);
-    private static final LifecyclePolicyMap STORE = new LifecyclePolicyMap();
-
-    private final Map<String, LifecyclePolicy> policyMap = new HashMap<>();
-
-    private LifecyclePolicyMap() {}
-
-    public static LifecyclePolicyMap get() {
-        return STORE;
-    }
-
-    public LifecyclePolicy get(String policyName) {
-        return policyMap.get(policyName);
-    }
-
-    @Override
-    public String getName() {
-        return getClass().getSimpleName();
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String[] policyNames = 
StartupProperties.get().getProperty("falcon.feed.lifecycle.policies").split(",");
-        for (String name : policyNames) {
-            LifecyclePolicy policy = 
ReflectionUtils.getInstanceByClassName(name);
-            LOG.debug("Loaded policy : {} for stage : {}", policy.getName(), 
policy.getStage());
-            policyMap.put(policy.getName(), policy);
-        }
-        validate();
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        policyMap.clear();
-    }
-
-    // validate that default policy for each stage is available
-    private void validate() throws FalconException {
-        for (FeedLifecycleStage stage : FeedLifecycleStage.values()) {
-            if (!policyMap.containsKey(stage.getDefaultPolicyName())) {
-                throw new FalconException("Default Policy: " + 
stage.getDefaultPolicyName()
-                        + " for stage: " + stage.name() + "was not found.");
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java 
b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
deleted file mode 100644
index 9962102..0000000
--- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.cleanup.AbstractCleanupHandler;
-import org.apache.falcon.cleanup.FeedCleanupHandler;
-import org.apache.falcon.cleanup.ProcessCleanupHandler;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.util.StartupProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Log cleanup service.
- */
-public class LogCleanupService implements FalconService {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(LogCleanupService.class);
-    private final ExpressionEvaluator evaluator = new 
ExpressionEvaluatorImpl();
-    private final ExpressionHelper resolver = ExpressionHelper.get();
-
-    @Override
-    public String getName() {
-        return "Falcon Log cleanup service";
-    }
-
-    @Override
-    public void init() throws FalconException {
-        Timer timer = new Timer();
-        timer.schedule(new CleanupThread(), 0, getDelay());
-        LOG.info("Falcon log cleanup service initialized");
-    }
-
-    private static class CleanupThread extends TimerTask {
-
-        private final AbstractCleanupHandler processCleanupHandler = new 
ProcessCleanupHandler();
-        private final AbstractCleanupHandler feedCleanupHandler = new 
FeedCleanupHandler();
-
-        @Override
-        public void run() {
-            try {
-                LOG.info("Cleaning up logs at: {}", new Date());
-                processCleanupHandler.cleanup();
-                feedCleanupHandler.cleanup();
-            } catch (Throwable t) {
-                LOG.error("Error in cleanup task: ", t);
-                GenericAlert.alertLogCleanupServiceFailed(
-                        "Exception in log cleanup service", t);
-            }
-        }
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        LOG.info("Falcon log cleanup service destroyed");
-    }
-
-    private long getDelay() throws FalconException {
-        String delay = StartupProperties.get().getProperty(
-                "falcon.cleanup.service.frequency", "days(1)");
-        try {
-            return (Long) evaluator.evaluate("${" + delay + "}", Long.class,
-                    resolver, resolver);
-        } catch (ELException e) {
-            throw new FalconException("Exception in EL evaluation", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/ProxyUserService.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/service/ProxyUserService.java 
b/common/src/main/java/org/apache/falcon/service/ProxyUserService.java
deleted file mode 100644
index 364c750..0000000
--- a/common/src/main/java/org/apache/falcon/service/ProxyUserService.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.util.RuntimeProperties;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.security.AccessControlException;
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The ProxyUserService checks if a user of a request has proxyuser privileges.
- * <p>
- * This check is based on the following criteria:
- * <p>
- * <ul>
- *     <li>The user of the request must be configured as proxy user in Falcon 
runtime properties.</li>
- *     <li>The user of the request must be making the request from a 
whitelisted host.</li>
- *     <li>The user of the request must be making the request on behalf of a 
user of a whitelisted group.</li>
- * </ul>
- * <p>
- */
-public class ProxyUserService implements FalconService {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ProxyUserService.class);
-
-
-    private Map<String, Set<String>> proxyUserHosts = new HashMap<>();
-    private Map<String, Set<String>> proxyUserGroups = new HashMap<>();
-
-    private static final String CONF_PREFIX = 
"falcon.service.ProxyUserService.proxyuser.";
-    private static final String GROUPS = ".groups";
-    private static final String HOSTS = ".hosts";
-    public static final String SERVICE_NAME = 
ProxyUserService.class.getSimpleName();
-
-    @Override
-    public String getName() {
-        return SERVICE_NAME;
-    }
-
-    /**
-     * Initializes the service.
-     * @throws FalconException thrown if the service could not be configured 
correctly.
-     */
-    @Override
-    public void init() throws FalconException {
-        Set<Map.Entry<Object, Object>> entrySet = 
RuntimeProperties.get().entrySet();
-
-        for (Map.Entry<Object, Object> entry : entrySet) {
-            String key = (String) entry.getKey();
-
-            if (key.startsWith(CONF_PREFIX) && key.endsWith(GROUPS)) {
-                String proxyUser = key.substring(0, key.lastIndexOf(GROUPS));
-                if (RuntimeProperties.get().getProperty(proxyUser + HOSTS) == 
null) {
-                    throw new FalconException(proxyUser + HOSTS + " property 
not set in runtime "
-                            + "properties. Please add it.");
-                }
-                proxyUser = proxyUser.substring(CONF_PREFIX.length());
-                String value = ((String) entry.getValue()).trim();
-                LOG.info("Loading proxyuser settings [{}]=[{}]", key, value);
-                Set<String> values = null;
-                if (!value.equals("*")) {
-                    values = new HashSet<>(Arrays.asList(value.split(",")));
-                }
-                proxyUserGroups.put(proxyUser, values);
-            }
-            if (key.startsWith(CONF_PREFIX) && key.endsWith(HOSTS)) {
-                String proxyUser = key.substring(0, key.lastIndexOf(HOSTS));
-                if (RuntimeProperties.get().getProperty(proxyUser + GROUPS) == 
null) {
-                    throw new FalconException(proxyUser + GROUPS + " property 
not set in runtime "
-                            + "properties. Please add it.");
-                }
-                proxyUser = proxyUser.substring(CONF_PREFIX.length());
-                String value = ((String) entry.getValue()).trim();
-                LOG.info("Loading proxyuser settings [{}]=[{}]", key, value);
-                Set<String> values = null;
-                if (!value.equals("*")) {
-                    String[] hosts = value.split(",");
-                    for (int i = 0; i < hosts.length; i++) {
-                        String hostName = hosts[i];
-                        try {
-                            hosts[i] = normalizeHostname(hostName);
-                        } catch (Exception ex) {
-                            throw new FalconException("Exception normalizing 
host name: " + hostName + "."
-                                    + ex.getMessage(), ex);
-                        }
-                        LOG.info("Hostname, original [{}], normalized [{}]", 
hostName, hosts[i]);
-                    }
-                    values = new HashSet<>(Arrays.asList(hosts));
-                }
-                proxyUserHosts.put(proxyUser, values);
-            }
-        }
-    }
-
-    /**
-     * Verifies a proxyuser.
-     *
-     * @param proxyUser user name of the proxy user.
-     * @param proxyHost host the proxy user is making the request from.
-     * @param doAsUser user the proxy user is impersonating.
-     * @throws java.io.IOException thrown if an error during the validation 
has occurred.
-     * @throws java.security.AccessControlException thrown if the user is not 
allowed to perform the proxyuser request.
-     */
-    public void validate(String proxyUser, String proxyHost, String doAsUser) 
throws IOException {
-        validateNotEmpty(proxyUser, "proxyUser",
-                "If you're attempting to use user-impersonation via a proxy 
user, please make sure that "
-                        + 
"falcon.service.ProxyUserService.proxyuser.#USER#.hosts and "
-                        + 
"falcon.service.ProxyUserService.proxyuser.#USER#.groups are configured 
correctly"
-        );
-        validateNotEmpty(proxyHost, "proxyHost",
-                "If you're attempting to use user-impersonation via a proxy 
user, please make sure that "
-                        + "falcon.service.ProxyUserService.proxyuser." + 
proxyUser + ".hosts and "
-                        + "falcon.service.ProxyUserService.proxyuser." + 
proxyUser + ".groups are configured correctly"
-        );
-        validateNotEmpty(doAsUser, "doAsUser", null);
-        LOG.debug("Authorization check proxyuser [{}] host [{}] doAs [{}]",
-                proxyUser, proxyHost, doAsUser);
-        if (proxyUserHosts.containsKey(proxyUser)) {
-            validateRequestorHost(proxyUser, proxyHost, 
proxyUserHosts.get(proxyUser));
-            validateGroup(proxyUser, doAsUser, proxyUserGroups.get(proxyUser));
-        } else {
-            throw new AccessControlException(MessageFormat.format("User [{0}] 
not defined as proxyuser. Please add it"
-                            + " to runtime properties.", proxyUser));
-        }
-    }
-
-    private void validateRequestorHost(String proxyUser, String hostname, 
Set<String> validHosts)
-        throws IOException {
-        if (validHosts != null) {
-            if (!validHosts.contains(hostname) && 
!validHosts.contains(normalizeHostname(hostname))) {
-                throw new 
AccessControlException(MessageFormat.format("Unauthorized host [{0}] for 
proxyuser [{1}]",
-                        hostname, proxyUser));
-            }
-        }
-    }
-
-    private void validateGroup(String proxyUser, String user, Set<String> 
validGroups) throws IOException {
-        if (validGroups != null) {
-            List<String> userGroups =  
Services.get().<GroupsService>getService(GroupsService.SERVICE_NAME)
-            .getGroups(user);
-            for (String g : validGroups) {
-                if (userGroups.contains(g)) {
-                    return;
-                }
-            }
-            throw new AccessControlException(
-                    MessageFormat.format("Unauthorized proxyuser [{0}] for 
user [{1}], not in proxyuser groups",
-                            proxyUser, user));
-        }
-    }
-
-    private String normalizeHostname(String name) {
-        try {
-            InetAddress address = InetAddress.getByName(name);
-            return address.getCanonicalHostName();
-        }  catch (IOException ex) {
-            throw new AccessControlException(MessageFormat.format("Could not 
resolve host [{0}], [{1}]", name,
-                    ex.getMessage()));
-        }
-    }
-
-    private static void validateNotEmpty(String str, String name, String info) 
{
-        if (StringUtils.isBlank(str)) {
-            throw new IllegalArgumentException(name + " cannot be null or 
empty" + (info == null ? "" : ", " + info));
-        }
-    }
-
-    /**
-     * Destroys the service.
-     */
-    @Override
-    public void destroy() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java 
b/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
deleted file mode 100644
index 4708b94..0000000
--- a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Initializer that Falcon uses at startup to bring up all the falcon startup 
services.
- */
-public class ServiceInitializer {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceInitializer.class);
-    private final Services services = Services.get();
-
-    public void initialize() throws FalconException {
-        String serviceClassNames = StartupProperties.get().
-                getProperty("application.services", 
"org.apache.falcon.entity.store.ConfigurationStore");
-        for (String serviceClassName : serviceClassNames.split(",")) {
-            serviceClassName = serviceClassName.trim();
-            if (serviceClassName.isEmpty()) {
-                continue;
-            }
-            FalconService service = 
ReflectionUtils.getInstanceByClassName(serviceClassName);
-            services.register(service);
-            LOG.info("Initializing service: {}", serviceClassName);
-            try {
-                service.init();
-            } catch (Throwable t) {
-                LOG.error("Failed to initialize service {}", serviceClassName, 
t);
-                throw new FalconException(t);
-            }
-            LOG.info("Service initialized: {}", serviceClassName);
-        }
-    }
-
-    public void destroy() throws FalconException {
-        for (FalconService service : services) {
-            LOG.info("Destroying service: {}", service.getClass().getName());
-            try {
-                service.destroy();
-            } catch (Throwable t) {
-                LOG.error("Failed to destroy service {}", 
service.getClass().getName(), t);
-                throw new FalconException(t);
-            }
-            LOG.info("Service destroyed: {}", service.getClass().getName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/Services.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/Services.java 
b/common/src/main/java/org/apache/falcon/service/Services.java
deleted file mode 100644
index 6659ccd..0000000
--- a/common/src/main/java/org/apache/falcon/service/Services.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.util.ReflectionUtils;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/**
- * Repository of services initialized at startup.
- */
-public final class Services implements Iterable<FalconService> {
-
-    private static final Services INSTANCE = new Services();
-
-    private Services() {
-    }
-
-    public static Services get() {
-        return INSTANCE;
-    }
-
-    private final Map<String, FalconService> services =
-            new LinkedHashMap<String, FalconService>();
-
-    public synchronized void register(FalconService service)
-        throws FalconException {
-
-        if (services.containsKey(service.getName())) {
-            throw new FalconException("Service " + service.getName() + " 
already registered");
-        } else {
-            services.put(service.getName(), service);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T extends FalconService> T getService(String serviceName) {
-        if (services.containsKey(serviceName)) {
-            return (T) services.get(serviceName);
-        } else {
-            throw new NoSuchElementException("Service " + serviceName + " not 
registered with registry");
-        }
-    }
-
-    public boolean isRegistered(String serviceName) {
-        return services.containsKey(serviceName);
-    }
-
-    @Override
-    public Iterator<FalconService> iterator() {
-        return services.values().iterator();
-    }
-
-    public FalconService init(String serviceName) throws FalconException {
-        if (isRegistered(serviceName)) {
-            throw new FalconException("Service is already initialized " + 
serviceName);
-        }
-        FalconService service = ReflectionUtils.getInstance(serviceName + 
".impl");
-        register(service);
-        return service;
-    }
-
-    public void reset() {
-        services.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java 
b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
deleted file mode 100644
index 6603bc6..0000000
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.update;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-
-/**
- * Helper methods to facilitate entity updates.
- */
-public final class UpdateHelper {
-    private static final Logger LOG = 
LoggerFactory.getLogger(UpdateHelper.class);
-
-    private static final String[] FEED_FIELDS = new String[]{"partitions", 
"groups", "lateArrival.cutOff",
-                                                             
"schema.location", "schema.provider", "tags",
-                                                             "group", "owner", 
"permission", };
-    private static final String[] PROCESS_FIELDS = new 
String[]{"retry.policy", "retry.delay", "retry.attempts",
-                                                                
"lateProcess.policy", "lateProcess.delay",
-                                                                
"lateProcess.lateInputs[\\d+].input",
-                                                                
"lateProcess.lateInputs[\\d+].workflowPath",
-                                                                "owner", 
"group", "permission", "tags",
-                                                                "pipelines", };
-
-    private UpdateHelper() {}
-
-    public static boolean isEntityUpdated(Entity oldEntity, Entity newEntity, 
String cluster,
-        Path oldStagingPath) throws FalconException {
-        Entity oldView = EntityUtil.getClusterView(oldEntity, cluster);
-        Entity newView = EntityUtil.getClusterView(newEntity, cluster);
-
-        //staging path contains md5 of the cluster view of entity
-        String[] parts = oldStagingPath.getName().split("_");
-        if (parts[0].equals(EntityUtil.md5(newView))) {
-            return false;
-        }
-
-        switch (oldEntity.getEntityType()) {
-        case FEED:
-            return !EntityUtil.equals(oldView, newView, FEED_FIELDS);
-
-        case PROCESS:
-            return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS);
-
-        default:
-        }
-        throw new IllegalArgumentException("Unhandled entity type " + 
oldEntity.getEntityType());
-    }
-
-    public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, 
Entity affectedEntity, String cluster)
-        throws FalconException {
-        if (oldEntity.getEntityType() == EntityType.FEED && 
affectedEntity.getEntityType() == EntityType.PROCESS) {
-
-            Feed oldFeed = (Feed) oldEntity;
-            Feed newFeed = (Feed) newEntity;
-            Process affectedProcess = (Process) affectedEntity;
-
-            //check if affectedProcess is defined for this cluster
-            Cluster processCluster = ProcessHelper.getCluster(affectedProcess, 
cluster);
-            if (processCluster == null) {
-                LOG.debug("Process {} is not defined for cluster {}. 
Skipping", affectedProcess.getName(), cluster);
-                return false;
-            }
-
-            if (processCluster.getValidity().getEnd().before(new Date())) {
-                LOG.debug("Process {} validity {} is in the past. 
Skipping...", affectedProcess.getName(),
-                    processCluster.getValidity().getEnd());
-                return false;
-            }
-
-            if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
-                LOG.debug("{}: Frequency has changed. Updating...", 
oldFeed.toShortString());
-                return true;
-            }
-
-            if (!StringUtils.equals(oldFeed.getAvailabilityFlag(), 
newFeed.getAvailabilityFlag())) {
-                LOG.debug("{}: Availability flag has changed. Updating...", 
oldFeed.toShortString());
-                return true;
-            }
-
-            org.apache.falcon.entity.v0.feed.Cluster oldFeedCluster = 
FeedHelper.getCluster(oldFeed, cluster);
-            org.apache.falcon.entity.v0.feed.Cluster newFeedCluster = 
FeedHelper.getCluster(newFeed, cluster);
-            if 
(!oldFeedCluster.getValidity().getStart().equals(newFeedCluster.getValidity().getStart()))
 {
-                LOG.debug("{}: Start time for cluster {} has changed. 
Updating...", oldFeed.toShortString(), cluster);
-                return true;
-            }
-
-            Storage oldFeedStorage = FeedHelper.createStorage(cluster, 
oldFeed);
-            Storage newFeedStorage = FeedHelper.createStorage(cluster, 
newFeed);
-
-            if (!oldFeedStorage.isIdentical(newFeedStorage)) {
-                LOG.debug("{}: Storage has changed. Updating...", 
oldFeed.toShortString());
-                return true;
-            }
-            return false;
-
-        } else {
-            LOG.debug(newEntity.toShortString());
-            LOG.debug(affectedEntity.toShortString());
-            throw new FalconException("Don't know what to do. Unexpected 
scenario");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java 
b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
deleted file mode 100644
index adf09c4..0000000
--- a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Base class for reading application properties.
- */
-public abstract class ApplicationProperties extends Properties {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationProperties.class);
-
-    protected abstract String getPropertyFile();
-
-    protected String domain;
-
-    protected ApplicationProperties() throws FalconException {
-        init();
-    }
-
-    protected void init() throws FalconException {
-        setDomain(System.getProperty("falcon.domain", 
System.getenv("FALCON_DOMAIN")));
-        loadProperties();
-    }
-
-    protected void setDomain(String domain) {
-        this.domain = domain;
-    }
-
-    public String getDomain() {
-        return domain;
-    }
-
-    protected void loadProperties() throws FalconException {
-        String propertyFileName = getPropertyFile();
-        String confDir = System.getProperty("config.location");
-        loadProperties(propertyFileName, confDir);
-    }
-
-    /**
-     * This method reads the given properties file in the following order:
-     * config.location & classpath. It falls back in that specific order.
-     *
-     * @throws FalconException
-     */
-    protected void loadProperties(String propertyFileName, String confDir) 
throws FalconException {
-        try {
-            InputStream resourceAsStream = 
checkConfigLocation(propertyFileName, confDir);
-
-            //Fallback to classpath
-            if (resourceAsStream == null) {
-                resourceAsStream = checkClassPath(propertyFileName);
-            }
-
-            if (resourceAsStream != null) {
-                try {
-                    doLoadProperties(resourceAsStream);
-                    return;
-                } finally {
-                    IOUtils.closeQuietly(resourceAsStream);
-                }
-            }
-            throw new FileNotFoundException("Unable to find: " + 
propertyFileName);
-        } catch (IOException e) {
-            throw new FalconException("Error loading properties file: " + 
getPropertyFile(), e);
-        }
-    }
-
-    private InputStream checkConfigLocation(String propertyFileName, String 
confDir)
-        throws FileNotFoundException {
-
-        InputStream resourceAsStream = null;
-        if (confDir != null) {
-            File fileToLoad = new File(confDir, propertyFileName);
-            resourceAsStream = getResourceAsStream(fileToLoad);
-        }
-        return resourceAsStream;
-    }
-
-    protected InputStream getResourceAsStream(File fileToLoad) throws 
FileNotFoundException {
-        InputStream resourceAsStream = null;
-        if (fileToLoad.exists() && fileToLoad.isFile() && 
fileToLoad.canRead()) {
-            LOG.info("config.location is set, using: {}", 
fileToLoad.getAbsolutePath());
-            resourceAsStream = new FileInputStream(fileToLoad);
-        }
-        return resourceAsStream;
-    }
-
-    protected InputStream checkClassPath(String propertyFileName) {
-
-        InputStream resourceAsStream = null;
-        Class clazz = ApplicationProperties.class;
-        URL resource = clazz.getResource("/" + propertyFileName);
-        if (resource != null) {
-            LOG.info("Fallback to classpath for: {}", resource);
-            resourceAsStream = clazz.getResourceAsStream("/" + 
propertyFileName);
-        } else {
-            resource = clazz.getResource(propertyFileName);
-            if (resource != null) {
-                LOG.info("Fallback to classpath for: {}", resource);
-                resourceAsStream = clazz.getResourceAsStream(propertyFileName);
-            }
-        }
-        return resourceAsStream;
-    }
-
-    private void doLoadProperties(InputStream resourceAsStream) throws 
IOException, FalconException {
-        Properties origProps = new Properties();
-        origProps.load(resourceAsStream);
-        if (domain == null) {
-            domain = origProps.getProperty("*.domain");
-            if (domain == null) {
-                throw new FalconException("Domain is not set!");
-            } else {
-                domain = ExpressionHelper.substitute(domain);
-            }
-        }
-
-        LOG.info("Initializing {} properties with domain {}", 
this.getClass().getName(), domain);
-        Set<String> keys = getKeys(origProps.keySet());
-        for (String key : keys) {
-            String value = origProps.getProperty(domain + "." + key, 
origProps.getProperty("*." + key));
-            if (value != null) {
-                value = ExpressionHelper.substitute(value);
-                LOG.debug("{}={}", key, value);
-                put(key, value);
-            }
-        }
-    }
-
-    protected Set<String> getKeys(Set<Object> keySet) {
-        Set<String> keys = new HashSet<String>();
-        for (Object keyObj : keySet) {
-            String key = (String) keyObj;
-            keys.add(key.substring(key.indexOf('.') + 1));
-        }
-        return keys;
-    }
-
-    @Override
-    public String getProperty(String key) {
-        return StringUtils.trim(super.getProperty(key));
-    }
-
-    @Override
-    public String getProperty(String key, String defaultValue) {
-        return StringUtils.trim(super.getProperty(key, defaultValue));
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/BuildProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/BuildProperties.java 
b/common/src/main/java/org/apache/falcon/util/BuildProperties.java
deleted file mode 100644
index 339dcb5..0000000
--- a/common/src/main/java/org/apache/falcon/util/BuildProperties.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.util;
-
-import org.apache.falcon.FalconException;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Application build info properties are exposed through this.
- */
-public final class BuildProperties extends ApplicationProperties {
-    private static final String PROPERTY_FILE = "falcon-buildinfo.properties";
-
-    private static final AtomicReference<BuildProperties> INSTANCE =
-            new AtomicReference<BuildProperties>();
-
-    private BuildProperties() throws FalconException {
-        super();
-    }
-
-    @Override
-    protected String getPropertyFile() {
-        return PROPERTY_FILE;
-    }
-
-    public static Properties get() {
-        try {
-            if (INSTANCE.get() == null) {
-                INSTANCE.compareAndSet(null, new BuildProperties());
-            }
-            return INSTANCE.get();
-        } catch (FalconException e) {
-            throw new RuntimeException("Unable to read application "
-                + "falcon build information properties", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java 
b/common/src/main/java/org/apache/falcon/util/DateUtil.java
deleted file mode 100644
index baf5b13..0000000
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.util;
-
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.Frequency;
-
-import java.util.Calendar;
-import java.util.Date;
-import java.util.TimeZone;
-
-/**
- * Helper to get date operations.
- */
-public final class DateUtil {
-
-    private static final long MINUTE_IN_MS = 60 * 1000L;
-    private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
-    private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
-    private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
-
-    //Friday, April 16, 9999 7:12:55 AM UTC corresponding date
-    public static final Date NEVER = new 
Date(Long.parseLong("253379862775000"));
-
-    public static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
-
-    private DateUtil() {}
-
-    public static Date getNextMinute(Date time) throws Exception {
-        Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-        insCal.setTime(time);
-        insCal.add(Calendar.MINUTE, 1);
-        return insCal.getTime();
-
-    }
-
-    public static String getDateFormatFromTime(long milliSeconds) {
-        return SchemaHelper.getDateFormat().format((new Date(milliSeconds)));
-    }
-
-    /**
-     * This function should not be used for scheduling related functions as it 
may cause correctness issues in those
-     * scenarios.
-     * @param frequency
-     * @return
-     */
-    public static Long getFrequencyInMillis(Frequency frequency){
-        switch (frequency.getTimeUnit()) {
-
-        case months:
-            return MONTH_IN_MS * frequency.getFrequencyAsInt();
-
-        case days:
-            return DAY_IN_MS * frequency.getFrequencyAsInt();
-
-        case hours:
-            return HOUR_IN_MS * frequency.getFrequencyAsInt();
-
-        case minutes:
-            return MINUTE_IN_MS * frequency.getFrequencyAsInt();
-
-        default:
-            return null;
-        }
-    }
-
-    /**
-     * Returns the current time, with seconds and milliseconds reset to 0.
-     * @return
-     */
-    public static Date now() {
-        Calendar cal = Calendar.getInstance();
-        cal.set(Calendar.SECOND, 0);
-        cal.set(Calendar.MILLISECOND, 0);
-        return cal.getTime();
-    }
-
-    /**
-     * Adds the supplied number of seconds to the given date and returns the 
new Date.
-     * @param date
-     * @param seconds
-     * @return
-     */
-    public static Date offsetTime(Date date, int seconds) {
-        return new Date(1000L * seconds + date.getTime());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java 
b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
deleted file mode 100644
index 5879f30..0000000
--- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.util;
-
-import org.apache.falcon.FalconException;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Application deployment properties. particularly relating to
- * whether the server is in embedded mode or distributed mode.
- */
-public final class DeploymentProperties extends ApplicationProperties {
-    private static final String PROPERTY_FILE = "deploy.properties";
-
-    private static final AtomicReference<DeploymentProperties> INSTANCE =
-            new AtomicReference<>();
-
-    private DeploymentProperties() throws FalconException {
-        super();
-    }
-
-    @Override
-    protected String getPropertyFile() {
-        return PROPERTY_FILE;
-    }
-
-    public static Properties get() {
-        try {
-            if (INSTANCE.get() == null) {
-                INSTANCE.compareAndSet(null, new DeploymentProperties());
-            }
-            return INSTANCE.get();
-        } catch (FalconException e) {
-            throw new RuntimeException("Unable to read application " + 
"startup properties", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java 
b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
deleted file mode 100644
index 561520c..0000000
--- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.util;
-
-import org.apache.falcon.entity.ColoClusterRelation;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Helper methods to deployment properties.
- */
-public final class DeploymentUtil {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DeploymentUtil.class);
-
-    protected static final String DEFAULT_COLO = "default";
-    protected static final String EMBEDDED = "embedded";
-    protected static final String DEPLOY_MODE = "deploy.mode";
-    private static final Set<String> DEFAULT_ALL_COLOS = new HashSet<String>();
-
-    protected static final String CURRENT_COLO;
-    protected static final boolean EMBEDDED_MODE;
-    private static boolean prism = false;
-
-    static {
-        DEFAULT_ALL_COLOS.add(DEFAULT_COLO);
-        EMBEDDED_MODE = DeploymentProperties.get().
-                getProperty(DEPLOY_MODE, EMBEDDED).equals(EMBEDDED);
-        if (EMBEDDED_MODE) {
-            CURRENT_COLO = DEFAULT_COLO;
-        } else {
-            CURRENT_COLO = StartupProperties.get().
-                    getProperty("current.colo", DEFAULT_COLO);
-        }
-        LOG.info("Running in embedded mode? {}", EMBEDDED_MODE);
-        LOG.info("Current colo: {}", CURRENT_COLO);
-    }
-
-    private DeploymentUtil() {}
-
-    public static void setPrismMode() {
-        prism = true;
-    }
-
-    public static boolean isPrism() {
-        return !EMBEDDED_MODE && prism;
-    }
-
-    public static String getCurrentColo() {
-        return CURRENT_COLO;
-    }
-
-    public static Set<String> getCurrentClusters() {
-        // return all clusters in embedded mode
-        if (EMBEDDED_MODE) {
-            Collection<String> allClusters = 
ConfigurationStore.get().getEntities(EntityType.CLUSTER);
-            Set<String> result = new HashSet<>(allClusters);
-            return result;
-        }
-        String colo = getCurrentColo();
-        return ColoClusterRelation.get().getClusters(colo);
-    }
-
-    public static boolean isEmbeddedMode() {
-        return EMBEDDED_MODE;
-    }
-
-    public static String getDefaultColo() {
-        return DEFAULT_COLO;
-    }
-
-    public static Set<String> getDefaultColos() {
-        DEFAULT_ALL_COLOS.add(DEFAULT_COLO);
-        return DEFAULT_ALL_COLOS;
-    }
-}

Reply via email to