http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java deleted file mode 100644 index 887164e..0000000 --- a/common/src/main/java/org/apache/falcon/security/DefaultAuthorizationProvider.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang.Validate; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.EntityNotRegisteredException; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -/** - * Default implementation of AuthorizationProvider in Falcon. - * - * The authorization is enforced in the following way: - * - * if admin resource, - * if authenticated user name matches the admin users configuration - * Else if groups of the authenticated user matches the admin groups configuration - * Else if entities or instance resource - * if the authenticated user matches the owner in ACL for the entity - * Else if the groups of the authenticated user matches the group in ACL for the entity - * Else if lineage resource - * All have read-only permissions - * Else bad resource - */ -public class DefaultAuthorizationProvider implements AuthorizationProvider { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultAuthorizationProvider.class); - - private static final Set<String> RESOURCES = new HashSet<String>( - Arrays.asList(new String[]{"admin", "entities", "instance", "metadata", })); - - /** - * Constant for the configuration property that indicates the prefix. - */ - protected static final String FALCON_PREFIX = "falcon.security.authorization."; - - /** - * Constant for the configuration property that indicates the blacklisted super users for falcon. - */ - private static final String ADMIN_USERS_KEY = FALCON_PREFIX + "admin.users"; - private static final String ADMIN_GROUPS_KEY = FALCON_PREFIX + "admin.groups"; - - /** - * The super-user is the user with the same identity as falcon process itself. - * Loosely, if you started falcon, then you are the super-user. - */ - protected static final String SUPER_USER = System.getProperty("user.name"); - - /** - * Constant for the configuration property that indicates the super user group. - */ - private static final String SUPER_USER_GROUP_KEY = FALCON_PREFIX + "superusergroup"; - - /** - * Super user group. - */ - private final String superUserGroup; - private final Set<String> adminUsers; - private final Set<String> adminGroups; - - public DefaultAuthorizationProvider() { - superUserGroup = StartupProperties.get().getProperty(SUPER_USER_GROUP_KEY); - adminUsers = getAdminNamesFromConfig(ADMIN_USERS_KEY); - adminGroups = getAdminNamesFromConfig(ADMIN_GROUPS_KEY); - } - - private Set<String> getAdminNamesFromConfig(String key) { - Set<String> adminNames = new HashSet<String>(); - String adminNamesConfig = StartupProperties.get().getProperty(key); - if (!StringUtils.isEmpty(adminNamesConfig)) { - adminNames.addAll(Arrays.asList(adminNamesConfig.split(","))); - } - - return Collections.unmodifiableSet(adminNames); - } - - /** - * Determines if the authenticated user is the user who started this process - * or belongs to the super user group. - * - * @param authenticatedUGI UGI - * @return true if super user else false. - */ - public boolean isSuperUser(UserGroupInformation authenticatedUGI) { - return SUPER_USER.equals(authenticatedUGI.getShortUserName()) - || (!StringUtils.isEmpty(superUserGroup) - && isUserInGroup(superUserGroup, authenticatedUGI)); - } - - /** - * Checks if authenticated user should proxy the entity acl owner. - * - * @param authenticatedUGI proxy ugi for the authenticated user. - * @param aclOwner entity ACL Owner. - * @param aclGroup entity ACL group. - * @throws IOException - */ - @Override - public boolean shouldProxy(UserGroupInformation authenticatedUGI, - final String aclOwner, - final String aclGroup) throws IOException { - Validate.notNull(authenticatedUGI, "User cannot be empty or null"); - Validate.notEmpty(aclOwner, "User cannot be empty or null"); - Validate.notEmpty(aclGroup, "Group cannot be empty or null"); - - return isSuperUser(authenticatedUGI) - || (!isUserACLOwner(authenticatedUGI.getShortUserName(), aclOwner) - && isUserInGroup(aclGroup, authenticatedUGI)); - } - - /** - * Determines if the authenticated user is authorized to execute the action on the resource. - * Throws an exception if not authorized. - * - * @param resource api resource, admin, entities or instance - * @param action action being authorized on resource and entity if applicable - * @param entityType entity type in question, not for admin resource - * @param entityName entity name in question, not for admin resource - * @param authenticatedUGI proxy ugi for the authenticated user - * @throws org.apache.hadoop.security.authorize.AuthorizationException - */ - @Override - public void authorizeResource(String resource, String action, - String entityType, String entityName, - UserGroupInformation authenticatedUGI) - throws AuthorizationException, EntityNotRegisteredException { - - Validate.notEmpty(resource, "Resource cannot be empty or null"); - Validate.isTrue(RESOURCES.contains(resource), "Illegal resource: " + resource); - Validate.notEmpty(action, "Action cannot be empty or null"); - - try { - if (isSuperUser(authenticatedUGI)) { - return; - } - - if ("admin".equals(resource)) { - if (!("version".equals(action) || "clearuser".equals(action) || "getuser".equals(action))) { - authorizeAdminResource(authenticatedUGI, action); - } - } else if ("entities".equals(resource) || "instance".equals(resource)) { - authorizeEntityResource(authenticatedUGI, entityName, entityType, action); - } else if ("metadata".equals(resource)) { - authorizeMetadataResource(authenticatedUGI, action); - } - } catch (IOException e) { - throw new AuthorizationException(e); - } - } - - protected Set<String> getGroupNames(UserGroupInformation proxyUgi) { - return new HashSet<String>(Arrays.asList(proxyUgi.getGroupNames())); - } - - /** - * Determines if the authenticated user is authorized to execute the action on the entity. - * Throws an exception if not authorized. - * - * @param entityName entity in question, applicable for entities and instance resource - * @param entityType entity in question, applicable for entities and instance resource - * @param acl entity ACL - * @param action action being authorized on resource and entity if applicable - * @param authenticatedUGI proxy ugi for the authenticated user - * @throws org.apache.hadoop.security.authorize.AuthorizationException - */ - @Override - public void authorizeEntity(String entityName, String entityType, AccessControlList acl, - String action, UserGroupInformation authenticatedUGI) - throws AuthorizationException { - - try { - LOG.info("Authorizing authenticatedUser={}, action={}, entity={}, type{}", - authenticatedUGI.getShortUserName(), action, entityName, entityType); - - if (isSuperUser(authenticatedUGI)) { - return; - } - - checkUser(entityName, acl.getOwner(), acl.getGroup(), action, authenticatedUGI); - } catch (IOException e) { - throw new AuthorizationException(e); - } - } - - /** - * Validate if the entity owner is the logged-in authenticated user. - * - * @param entityName entity name. - * @param aclOwner entity ACL Owner. - * @param aclGroup entity ACL group. - * @param action action being authorized on resource and entity if applicable. - * @param authenticatedUGI proxy ugi for the authenticated user. - * @throws AuthorizationException - */ - protected void checkUser(String entityName, String aclOwner, String aclGroup, String action, - UserGroupInformation authenticatedUGI) throws AuthorizationException { - final String authenticatedUser = authenticatedUGI.getShortUserName(); - if (isUserACLOwner(authenticatedUser, aclOwner) - || isUserInGroup(aclGroup, authenticatedUGI)) { - return; - } - - StringBuilder message = new StringBuilder("Permission denied: authenticatedUser="); - message.append(authenticatedUser); - message.append(!authenticatedUser.equals(aclOwner) - ? " not entity owner=" + aclOwner - : " not in group=" + aclGroup); - message.append(", entity=").append(entityName).append(", action=").append(action); - - LOG.error(message.toString()); - throw new AuthorizationException(message.toString()); - } - - /** - * Determines if the authenticated user is the entity ACL owner. - * - * @param authenticatedUser authenticated user - * @param aclOwner entity ACL owner - * @return true if authenticated user is the entity acl owner, false otherwise. - */ - protected boolean isUserACLOwner(String authenticatedUser, String aclOwner) { - return authenticatedUser.equals(aclOwner); - } - - /** - * Checks if the user's group matches the entity ACL group. - * - * @param group Entity ACL group. - * @param proxyUgi proxy ugi for the authenticated user. - * @return true if user groups contains entity acl group. - */ - protected boolean isUserInGroup(String group, UserGroupInformation proxyUgi) { - Set<String> groups = getGroupNames(proxyUgi); - return groups.contains(group); - } - - /** - * Check if the user has admin privileges. - * - * @param authenticatedUGI proxy ugi for the authenticated user. - * @param action admin action on the resource. - * @throws AuthorizationException if the user does not have admin privileges. - */ - protected void authorizeAdminResource(UserGroupInformation authenticatedUGI, - String action) throws AuthorizationException { - final String authenticatedUser = authenticatedUGI.getShortUserName(); - LOG.debug("Authorizing user={} for admin, action={}", authenticatedUser, action); - if (adminUsers.contains(authenticatedUser) || isUserInAdminGroups(authenticatedUGI)) { - return; - } - - LOG.error("Permission denied: user {} does not have admin privilege for action={}", - authenticatedUser, action); - throw new AuthorizationException("Permission denied: user=" + authenticatedUser - + " does not have admin privilege for action=" + action); - } - - protected boolean isUserInAdminGroups(UserGroupInformation proxyUgi) { - final Set<String> groups = getGroupNames(proxyUgi); - groups.retainAll(adminGroups); - return !groups.isEmpty(); - } - - protected void authorizeEntityResource(UserGroupInformation authenticatedUGI, - String entityName, String entityType, - String action) - throws AuthorizationException, EntityNotRegisteredException { - - Validate.notEmpty(entityType, "Entity type cannot be empty or null"); - LOG.debug("Authorizing authenticatedUser={} against entity/instance action={}, " - + "entity name={}, entity type={}", - authenticatedUGI.getShortUserName(), action, entityName, entityType); - - if (entityName != null) { // lifecycle actions - Entity entity = getEntity(entityName, entityType); - authorizeEntity(entity.getName(), entity.getEntityType().name(), - entity.getACL(), action, authenticatedUGI); - } else { - // non lifecycle actions, lifecycle actions with null entity will validate later - LOG.info("Authorization for action={} will be done in the API", action); - } - } - - private Entity getEntity(String entityName, String entityType) - throws EntityNotRegisteredException, AuthorizationException { - - try { - EntityType type = EntityType.getEnum(entityType); - return EntityUtil.getEntity(type, entityName); - } catch (FalconException e) { - if (e instanceof EntityNotRegisteredException) { - throw (EntityNotRegisteredException) e; - } else { - throw new AuthorizationException(e); - } - } - } - - protected void authorizeMetadataResource(UserGroupInformation authenticatedUGI, - String action) throws AuthorizationException { - LOG.debug("User {} authorized for action {} ", authenticatedUGI.getShortUserName(), action); - // todo - read-only for all metadata but needs to be implemented - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/SecurityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java deleted file mode 100644 index c187358..0000000 --- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; -import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * Security Util - bunch of security related helper methods. - */ -public final class SecurityUtil { - - /** - * Constant for the configuration property that indicates the prefix. - */ - private static final String CONFIG_PREFIX = "falcon.authentication."; - - /** - * Constant for the configuration property that indicates the authentication type. - */ - public static final String AUTHENTICATION_TYPE = CONFIG_PREFIX + "type"; - - /** - * Constant for the configuration property that indicates the Name node principal. - */ - public static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal"; - - /** - * Constant for the configuration property that indicates the Name node principal. - * This is used to talk to Hive Meta Store during parsing and validations only. - */ - public static final String HIVE_METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; - - public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled"; - - public static final String METASTORE_PRINCIPAL = "hcat.metastore.principal"; - - private static final Logger LOG = LoggerFactory.getLogger(SecurityUtil.class); - - private SecurityUtil() { - } - - public static String getAuthenticationType() { - return StartupProperties.get().getProperty( - AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE); - } - - /** - * Checks if kerberos authentication is enabled in the configuration. - * - * @return true if falcon.authentication.type is kerberos, false otherwise - */ - public static boolean isSecurityEnabled() { - String authenticationType = StartupProperties.get().getProperty( - AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE); - - final boolean useKerberos; - if (authenticationType == null || PseudoAuthenticationHandler.TYPE.equals(authenticationType)) { - useKerberos = false; - } else if (KerberosAuthenticationHandler.TYPE.equals(authenticationType)) { - useKerberos = true; - } else { - throw new IllegalArgumentException("Invalid attribute value for " - + AUTHENTICATION_TYPE + " of " + authenticationType); - } - - return useKerberos; - } - - public static String getLocalHostName() throws UnknownHostException { - return InetAddress.getLocalHost().getCanonicalHostName(); - } - - /** - * Checks if authorization is enabled in the configuration. - * - * @return true if falcon.security.authorization.enabled is enabled, false otherwise - */ - public static boolean isAuthorizationEnabled() { - return Boolean.valueOf(StartupProperties.get().getProperty( - "falcon.security.authorization.enabled", "false")); - } - - public static AuthorizationProvider getAuthorizationProvider() throws FalconException { - String providerClassName = StartupProperties.get().getProperty( - "falcon.security.authorization.provider", - "org.apache.falcon.security.DefaultAuthorizationProvider"); - return ReflectionUtils.getInstanceByClassName(providerClassName); - } - - public static void tryProxy(Entity entity, final String doAsUser) throws IOException, FalconException { - if (entity != null && entity.getACL() != null && SecurityUtil.isAuthorizationEnabled()) { - final String aclOwner = entity.getACL().getOwner(); - final String aclGroup = entity.getACL().getGroup(); - - if (StringUtils.isNotEmpty(doAsUser)) { - if (!doAsUser.equalsIgnoreCase(aclOwner)) { - LOG.warn("doAs user {} not same as acl owner {}. Ignoring acl owner.", doAsUser, aclOwner); - throw new FalconException("doAs user and ACL owner mismatch. doAs user " + doAsUser - + " should be same as ACL owner " + aclOwner); - } - return; - } - if (SecurityUtil.getAuthorizationProvider().shouldProxy( - CurrentUser.getAuthenticatedUGI(), aclOwner, aclGroup)) { - CurrentUser.proxy(aclOwner, aclGroup); - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java b/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java deleted file mode 100644 index e20b0b5..0000000 --- a/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; - -/** - * Configuration change notification listener. - */ -public interface ConfigurationChangeListener { - - /** - * This is upon adding a new entity to Store. - * - * @param entity entity object - * @throws FalconException - */ - void onAdd(Entity entity) throws FalconException; - - /** - * This is upon removing an existing entity from the Store. - * - * @param entity entity object - * @throws FalconException - */ - void onRemove(Entity entity) throws FalconException; - - /** - * This is upon updating an entity to the store. - * - * @param oldEntity old entity object - * @param newEntity updated entity object - * @throws FalconException - */ - void onChange(Entity oldEntity, Entity newEntity) throws FalconException; - - /** - * This is when existing entities are read from the store during startup. - * - * @param entity entity object - * @throws FalconException - */ - void onReload(Entity entity) throws FalconException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/FalconService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/FalconService.java b/common/src/main/java/org/apache/falcon/service/FalconService.java deleted file mode 100644 index a1eb8e0..0000000 --- a/common/src/main/java/org/apache/falcon/service/FalconService.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.falcon.FalconException; - -/** - * Falcon service initialized at startup. - */ -public interface FalconService { - - String getName(); - - void init() throws FalconException; - - void destroy() throws FalconException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/GroupsService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/GroupsService.java b/common/src/main/java/org/apache/falcon/service/GroupsService.java deleted file mode 100644 index dd4d946..0000000 --- a/common/src/main/java/org/apache/falcon/service/GroupsService.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.Groups; - -import java.io.IOException; -import java.util.List; - -/** - * The GroupsService class delegates to the Hadoop's <code>org.apache.hadoop.security.Groups</code> - * to retrieve the groups a user belongs to. - */ -public class GroupsService implements FalconService { - private org.apache.hadoop.security.Groups hGroups; - - public static final String SERVICE_NAME = GroupsService.class.getSimpleName(); - - /** - * Initializes the service. - */ - @Override - public void init() { - hGroups = new Groups(new Configuration(true)); - } - - /** - * Destroys the service. - */ - @Override - public void destroy() { - } - - @Override - public String getName() { - return SERVICE_NAME; - } - - /** - * Returns the list of groups a user belongs to. - * - * @param user user name. - * @return the groups the given user belongs to. - * @throws java.io.IOException thrown if there was an error retrieving the groups of the user. - */ - public List<String> getGroups(String user) throws IOException { - return hGroups.getGroups(user); - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java b/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java deleted file mode 100644 index b8c979e..0000000 --- a/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.falcon.FalconException; -import org.apache.falcon.lifecycle.FeedLifecycleStage; -import org.apache.falcon.lifecycle.LifecyclePolicy; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; - -/** - * Stores all internal and external feed lifecycle policies. - */ -public final class LifecyclePolicyMap implements FalconService { - private static final Logger LOG = LoggerFactory.getLogger(LifecyclePolicyMap.class); - private static final LifecyclePolicyMap STORE = new LifecyclePolicyMap(); - - private final Map<String, LifecyclePolicy> policyMap = new HashMap<>(); - - private LifecyclePolicyMap() {} - - public static LifecyclePolicyMap get() { - return STORE; - } - - public LifecyclePolicy get(String policyName) { - return policyMap.get(policyName); - } - - @Override - public String getName() { - return getClass().getSimpleName(); - } - - @Override - public void init() throws FalconException { - String[] policyNames = StartupProperties.get().getProperty("falcon.feed.lifecycle.policies").split(","); - for (String name : policyNames) { - LifecyclePolicy policy = ReflectionUtils.getInstanceByClassName(name); - LOG.debug("Loaded policy : {} for stage : {}", policy.getName(), policy.getStage()); - policyMap.put(policy.getName(), policy); - } - validate(); - } - - @Override - public void destroy() throws FalconException { - policyMap.clear(); - } - - // validate that default policy for each stage is available - private void validate() throws FalconException { - for (FeedLifecycleStage stage : FeedLifecycleStage.values()) { - if (!policyMap.containsKey(stage.getDefaultPolicyName())) { - throw new FalconException("Default Policy: " + stage.getDefaultPolicyName() - + " for stage: " + stage.name() + "was not found."); - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/LogCleanupService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java deleted file mode 100644 index 9962102..0000000 --- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.service; - -import java.util.Date; -import java.util.Timer; -import java.util.TimerTask; - -import javax.servlet.jsp.el.ELException; -import javax.servlet.jsp.el.ExpressionEvaluator; - -import org.apache.commons.el.ExpressionEvaluatorImpl; -import org.apache.falcon.FalconException; -import org.apache.falcon.aspect.GenericAlert; -import org.apache.falcon.cleanup.AbstractCleanupHandler; -import org.apache.falcon.cleanup.FeedCleanupHandler; -import org.apache.falcon.cleanup.ProcessCleanupHandler; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.util.StartupProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Log cleanup service. - */ -public class LogCleanupService implements FalconService { - - private static final Logger LOG = LoggerFactory.getLogger(LogCleanupService.class); - private final ExpressionEvaluator evaluator = new ExpressionEvaluatorImpl(); - private final ExpressionHelper resolver = ExpressionHelper.get(); - - @Override - public String getName() { - return "Falcon Log cleanup service"; - } - - @Override - public void init() throws FalconException { - Timer timer = new Timer(); - timer.schedule(new CleanupThread(), 0, getDelay()); - LOG.info("Falcon log cleanup service initialized"); - } - - private static class CleanupThread extends TimerTask { - - private final AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler(); - private final AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler(); - - @Override - public void run() { - try { - LOG.info("Cleaning up logs at: {}", new Date()); - processCleanupHandler.cleanup(); - feedCleanupHandler.cleanup(); - } catch (Throwable t) { - LOG.error("Error in cleanup task: ", t); - GenericAlert.alertLogCleanupServiceFailed( - "Exception in log cleanup service", t); - } - } - } - - @Override - public void destroy() throws FalconException { - LOG.info("Falcon log cleanup service destroyed"); - } - - private long getDelay() throws FalconException { - String delay = StartupProperties.get().getProperty( - "falcon.cleanup.service.frequency", "days(1)"); - try { - return (Long) evaluator.evaluate("${" + delay + "}", Long.class, - resolver, resolver); - } catch (ELException e) { - throw new FalconException("Exception in EL evaluation", e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/ProxyUserService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/ProxyUserService.java b/common/src/main/java/org/apache/falcon/service/ProxyUserService.java deleted file mode 100644 index 364c750..0000000 --- a/common/src/main/java/org/apache/falcon/service/ProxyUserService.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.util.RuntimeProperties; - -import java.io.IOException; -import java.net.InetAddress; -import java.security.AccessControlException; -import java.text.MessageFormat; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The ProxyUserService checks if a user of a request has proxyuser privileges. - * <p> - * This check is based on the following criteria: - * <p> - * <ul> - * <li>The user of the request must be configured as proxy user in Falcon runtime properties.</li> - * <li>The user of the request must be making the request from a whitelisted host.</li> - * <li>The user of the request must be making the request on behalf of a user of a whitelisted group.</li> - * </ul> - * <p> - */ -public class ProxyUserService implements FalconService { - private static final Logger LOG = LoggerFactory.getLogger(ProxyUserService.class); - - - private Map<String, Set<String>> proxyUserHosts = new HashMap<>(); - private Map<String, Set<String>> proxyUserGroups = new HashMap<>(); - - private static final String CONF_PREFIX = "falcon.service.ProxyUserService.proxyuser."; - private static final String GROUPS = ".groups"; - private static final String HOSTS = ".hosts"; - public static final String SERVICE_NAME = ProxyUserService.class.getSimpleName(); - - @Override - public String getName() { - return SERVICE_NAME; - } - - /** - * Initializes the service. - * @throws FalconException thrown if the service could not be configured correctly. - */ - @Override - public void init() throws FalconException { - Set<Map.Entry<Object, Object>> entrySet = RuntimeProperties.get().entrySet(); - - for (Map.Entry<Object, Object> entry : entrySet) { - String key = (String) entry.getKey(); - - if (key.startsWith(CONF_PREFIX) && key.endsWith(GROUPS)) { - String proxyUser = key.substring(0, key.lastIndexOf(GROUPS)); - if (RuntimeProperties.get().getProperty(proxyUser + HOSTS) == null) { - throw new FalconException(proxyUser + HOSTS + " property not set in runtime " - + "properties. Please add it."); - } - proxyUser = proxyUser.substring(CONF_PREFIX.length()); - String value = ((String) entry.getValue()).trim(); - LOG.info("Loading proxyuser settings [{}]=[{}]", key, value); - Set<String> values = null; - if (!value.equals("*")) { - values = new HashSet<>(Arrays.asList(value.split(","))); - } - proxyUserGroups.put(proxyUser, values); - } - if (key.startsWith(CONF_PREFIX) && key.endsWith(HOSTS)) { - String proxyUser = key.substring(0, key.lastIndexOf(HOSTS)); - if (RuntimeProperties.get().getProperty(proxyUser + GROUPS) == null) { - throw new FalconException(proxyUser + GROUPS + " property not set in runtime " - + "properties. Please add it."); - } - proxyUser = proxyUser.substring(CONF_PREFIX.length()); - String value = ((String) entry.getValue()).trim(); - LOG.info("Loading proxyuser settings [{}]=[{}]", key, value); - Set<String> values = null; - if (!value.equals("*")) { - String[] hosts = value.split(","); - for (int i = 0; i < hosts.length; i++) { - String hostName = hosts[i]; - try { - hosts[i] = normalizeHostname(hostName); - } catch (Exception ex) { - throw new FalconException("Exception normalizing host name: " + hostName + "." - + ex.getMessage(), ex); - } - LOG.info("Hostname, original [{}], normalized [{}]", hostName, hosts[i]); - } - values = new HashSet<>(Arrays.asList(hosts)); - } - proxyUserHosts.put(proxyUser, values); - } - } - } - - /** - * Verifies a proxyuser. - * - * @param proxyUser user name of the proxy user. - * @param proxyHost host the proxy user is making the request from. - * @param doAsUser user the proxy user is impersonating. - * @throws java.io.IOException thrown if an error during the validation has occurred. - * @throws java.security.AccessControlException thrown if the user is not allowed to perform the proxyuser request. - */ - public void validate(String proxyUser, String proxyHost, String doAsUser) throws IOException { - validateNotEmpty(proxyUser, "proxyUser", - "If you're attempting to use user-impersonation via a proxy user, please make sure that " - + "falcon.service.ProxyUserService.proxyuser.#USER#.hosts and " - + "falcon.service.ProxyUserService.proxyuser.#USER#.groups are configured correctly" - ); - validateNotEmpty(proxyHost, "proxyHost", - "If you're attempting to use user-impersonation via a proxy user, please make sure that " - + "falcon.service.ProxyUserService.proxyuser." + proxyUser + ".hosts and " - + "falcon.service.ProxyUserService.proxyuser." + proxyUser + ".groups are configured correctly" - ); - validateNotEmpty(doAsUser, "doAsUser", null); - LOG.debug("Authorization check proxyuser [{}] host [{}] doAs [{}]", - proxyUser, proxyHost, doAsUser); - if (proxyUserHosts.containsKey(proxyUser)) { - validateRequestorHost(proxyUser, proxyHost, proxyUserHosts.get(proxyUser)); - validateGroup(proxyUser, doAsUser, proxyUserGroups.get(proxyUser)); - } else { - throw new AccessControlException(MessageFormat.format("User [{0}] not defined as proxyuser. Please add it" - + " to runtime properties.", proxyUser)); - } - } - - private void validateRequestorHost(String proxyUser, String hostname, Set<String> validHosts) - throws IOException { - if (validHosts != null) { - if (!validHosts.contains(hostname) && !validHosts.contains(normalizeHostname(hostname))) { - throw new AccessControlException(MessageFormat.format("Unauthorized host [{0}] for proxyuser [{1}]", - hostname, proxyUser)); - } - } - } - - private void validateGroup(String proxyUser, String user, Set<String> validGroups) throws IOException { - if (validGroups != null) { - List<String> userGroups = Services.get().<GroupsService>getService(GroupsService.SERVICE_NAME) - .getGroups(user); - for (String g : validGroups) { - if (userGroups.contains(g)) { - return; - } - } - throw new AccessControlException( - MessageFormat.format("Unauthorized proxyuser [{0}] for user [{1}], not in proxyuser groups", - proxyUser, user)); - } - } - - private String normalizeHostname(String name) { - try { - InetAddress address = InetAddress.getByName(name); - return address.getCanonicalHostName(); - } catch (IOException ex) { - throw new AccessControlException(MessageFormat.format("Could not resolve host [{0}], [{1}]", name, - ex.getMessage())); - } - } - - private static void validateNotEmpty(String str, String name, String info) { - if (StringUtils.isBlank(str)) { - throw new IllegalArgumentException(name + " cannot be null or empty" + (info == null ? "" : ", " + info)); - } - } - - /** - * Destroys the service. - */ - @Override - public void destroy() { - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java b/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java deleted file mode 100644 index 4708b94..0000000 --- a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.falcon.FalconException; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Initializer that Falcon uses at startup to bring up all the falcon startup services. - */ -public class ServiceInitializer { - - private static final Logger LOG = LoggerFactory.getLogger(ServiceInitializer.class); - private final Services services = Services.get(); - - public void initialize() throws FalconException { - String serviceClassNames = StartupProperties.get(). - getProperty("application.services", "org.apache.falcon.entity.store.ConfigurationStore"); - for (String serviceClassName : serviceClassNames.split(",")) { - serviceClassName = serviceClassName.trim(); - if (serviceClassName.isEmpty()) { - continue; - } - FalconService service = ReflectionUtils.getInstanceByClassName(serviceClassName); - services.register(service); - LOG.info("Initializing service: {}", serviceClassName); - try { - service.init(); - } catch (Throwable t) { - LOG.error("Failed to initialize service {}", serviceClassName, t); - throw new FalconException(t); - } - LOG.info("Service initialized: {}", serviceClassName); - } - } - - public void destroy() throws FalconException { - for (FalconService service : services) { - LOG.info("Destroying service: {}", service.getClass().getName()); - try { - service.destroy(); - } catch (Throwable t) { - LOG.error("Failed to destroy service {}", service.getClass().getName(), t); - throw new FalconException(t); - } - LOG.info("Service destroyed: {}", service.getClass().getName()); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/service/Services.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/service/Services.java b/common/src/main/java/org/apache/falcon/service/Services.java deleted file mode 100644 index 6659ccd..0000000 --- a/common/src/main/java/org/apache/falcon/service/Services.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.falcon.FalconException; -import org.apache.falcon.util.ReflectionUtils; - -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.NoSuchElementException; - -/** - * Repository of services initialized at startup. - */ -public final class Services implements Iterable<FalconService> { - - private static final Services INSTANCE = new Services(); - - private Services() { - } - - public static Services get() { - return INSTANCE; - } - - private final Map<String, FalconService> services = - new LinkedHashMap<String, FalconService>(); - - public synchronized void register(FalconService service) - throws FalconException { - - if (services.containsKey(service.getName())) { - throw new FalconException("Service " + service.getName() + " already registered"); - } else { - services.put(service.getName(), service); - } - } - - @SuppressWarnings("unchecked") - public <T extends FalconService> T getService(String serviceName) { - if (services.containsKey(serviceName)) { - return (T) services.get(serviceName); - } else { - throw new NoSuchElementException("Service " + serviceName + " not registered with registry"); - } - } - - public boolean isRegistered(String serviceName) { - return services.containsKey(serviceName); - } - - @Override - public Iterator<FalconService> iterator() { - return services.values().iterator(); - } - - public FalconService init(String serviceName) throws FalconException { - if (isRegistered(serviceName)) { - throw new FalconException("Service is already initialized " + serviceName); - } - FalconService service = ReflectionUtils.getInstance(serviceName + ".impl"); - register(service); - return service; - } - - public void reset() { - services.clear(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/update/UpdateHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java deleted file mode 100644 index 6603bc6..0000000 --- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.update; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.ProcessHelper; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; - -/** - * Helper methods to facilitate entity updates. - */ -public final class UpdateHelper { - private static final Logger LOG = LoggerFactory.getLogger(UpdateHelper.class); - - private static final String[] FEED_FIELDS = new String[]{"partitions", "groups", "lateArrival.cutOff", - "schema.location", "schema.provider", "tags", - "group", "owner", "permission", }; - private static final String[] PROCESS_FIELDS = new String[]{"retry.policy", "retry.delay", "retry.attempts", - "lateProcess.policy", "lateProcess.delay", - "lateProcess.lateInputs[\\d+].input", - "lateProcess.lateInputs[\\d+].workflowPath", - "owner", "group", "permission", "tags", - "pipelines", }; - - private UpdateHelper() {} - - public static boolean isEntityUpdated(Entity oldEntity, Entity newEntity, String cluster, - Path oldStagingPath) throws FalconException { - Entity oldView = EntityUtil.getClusterView(oldEntity, cluster); - Entity newView = EntityUtil.getClusterView(newEntity, cluster); - - //staging path contains md5 of the cluster view of entity - String[] parts = oldStagingPath.getName().split("_"); - if (parts[0].equals(EntityUtil.md5(newView))) { - return false; - } - - switch (oldEntity.getEntityType()) { - case FEED: - return !EntityUtil.equals(oldView, newView, FEED_FIELDS); - - case PROCESS: - return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS); - - default: - } - throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType()); - } - - public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity, String cluster) - throws FalconException { - if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType() == EntityType.PROCESS) { - - Feed oldFeed = (Feed) oldEntity; - Feed newFeed = (Feed) newEntity; - Process affectedProcess = (Process) affectedEntity; - - //check if affectedProcess is defined for this cluster - Cluster processCluster = ProcessHelper.getCluster(affectedProcess, cluster); - if (processCluster == null) { - LOG.debug("Process {} is not defined for cluster {}. Skipping", affectedProcess.getName(), cluster); - return false; - } - - if (processCluster.getValidity().getEnd().before(new Date())) { - LOG.debug("Process {} validity {} is in the past. Skipping...", affectedProcess.getName(), - processCluster.getValidity().getEnd()); - return false; - } - - if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) { - LOG.debug("{}: Frequency has changed. Updating...", oldFeed.toShortString()); - return true; - } - - if (!StringUtils.equals(oldFeed.getAvailabilityFlag(), newFeed.getAvailabilityFlag())) { - LOG.debug("{}: Availability flag has changed. Updating...", oldFeed.toShortString()); - return true; - } - - org.apache.falcon.entity.v0.feed.Cluster oldFeedCluster = FeedHelper.getCluster(oldFeed, cluster); - org.apache.falcon.entity.v0.feed.Cluster newFeedCluster = FeedHelper.getCluster(newFeed, cluster); - if (!oldFeedCluster.getValidity().getStart().equals(newFeedCluster.getValidity().getStart())) { - LOG.debug("{}: Start time for cluster {} has changed. Updating...", oldFeed.toShortString(), cluster); - return true; - } - - Storage oldFeedStorage = FeedHelper.createStorage(cluster, oldFeed); - Storage newFeedStorage = FeedHelper.createStorage(cluster, newFeed); - - if (!oldFeedStorage.isIdentical(newFeedStorage)) { - LOG.debug("{}: Storage has changed. Updating...", oldFeed.toShortString()); - return true; - } - return false; - - } else { - LOG.debug(newEntity.toShortString()); - LOG.debug(affectedEntity.toShortString()); - throw new FalconException("Don't know what to do. Unexpected scenario"); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java deleted file mode 100644 index adf09c4..0000000 --- a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.expression.ExpressionHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; - -/** - * Base class for reading application properties. - */ -public abstract class ApplicationProperties extends Properties { - - private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class); - - protected abstract String getPropertyFile(); - - protected String domain; - - protected ApplicationProperties() throws FalconException { - init(); - } - - protected void init() throws FalconException { - setDomain(System.getProperty("falcon.domain", System.getenv("FALCON_DOMAIN"))); - loadProperties(); - } - - protected void setDomain(String domain) { - this.domain = domain; - } - - public String getDomain() { - return domain; - } - - protected void loadProperties() throws FalconException { - String propertyFileName = getPropertyFile(); - String confDir = System.getProperty("config.location"); - loadProperties(propertyFileName, confDir); - } - - /** - * This method reads the given properties file in the following order: - * config.location & classpath. It falls back in that specific order. - * - * @throws FalconException - */ - protected void loadProperties(String propertyFileName, String confDir) throws FalconException { - try { - InputStream resourceAsStream = checkConfigLocation(propertyFileName, confDir); - - //Fallback to classpath - if (resourceAsStream == null) { - resourceAsStream = checkClassPath(propertyFileName); - } - - if (resourceAsStream != null) { - try { - doLoadProperties(resourceAsStream); - return; - } finally { - IOUtils.closeQuietly(resourceAsStream); - } - } - throw new FileNotFoundException("Unable to find: " + propertyFileName); - } catch (IOException e) { - throw new FalconException("Error loading properties file: " + getPropertyFile(), e); - } - } - - private InputStream checkConfigLocation(String propertyFileName, String confDir) - throws FileNotFoundException { - - InputStream resourceAsStream = null; - if (confDir != null) { - File fileToLoad = new File(confDir, propertyFileName); - resourceAsStream = getResourceAsStream(fileToLoad); - } - return resourceAsStream; - } - - protected InputStream getResourceAsStream(File fileToLoad) throws FileNotFoundException { - InputStream resourceAsStream = null; - if (fileToLoad.exists() && fileToLoad.isFile() && fileToLoad.canRead()) { - LOG.info("config.location is set, using: {}", fileToLoad.getAbsolutePath()); - resourceAsStream = new FileInputStream(fileToLoad); - } - return resourceAsStream; - } - - protected InputStream checkClassPath(String propertyFileName) { - - InputStream resourceAsStream = null; - Class clazz = ApplicationProperties.class; - URL resource = clazz.getResource("/" + propertyFileName); - if (resource != null) { - LOG.info("Fallback to classpath for: {}", resource); - resourceAsStream = clazz.getResourceAsStream("/" + propertyFileName); - } else { - resource = clazz.getResource(propertyFileName); - if (resource != null) { - LOG.info("Fallback to classpath for: {}", resource); - resourceAsStream = clazz.getResourceAsStream(propertyFileName); - } - } - return resourceAsStream; - } - - private void doLoadProperties(InputStream resourceAsStream) throws IOException, FalconException { - Properties origProps = new Properties(); - origProps.load(resourceAsStream); - if (domain == null) { - domain = origProps.getProperty("*.domain"); - if (domain == null) { - throw new FalconException("Domain is not set!"); - } else { - domain = ExpressionHelper.substitute(domain); - } - } - - LOG.info("Initializing {} properties with domain {}", this.getClass().getName(), domain); - Set<String> keys = getKeys(origProps.keySet()); - for (String key : keys) { - String value = origProps.getProperty(domain + "." + key, origProps.getProperty("*." + key)); - if (value != null) { - value = ExpressionHelper.substitute(value); - LOG.debug("{}={}", key, value); - put(key, value); - } - } - } - - protected Set<String> getKeys(Set<Object> keySet) { - Set<String> keys = new HashSet<String>(); - for (Object keyObj : keySet) { - String key = (String) keyObj; - keys.add(key.substring(key.indexOf('.') + 1)); - } - return keys; - } - - @Override - public String getProperty(String key) { - return StringUtils.trim(super.getProperty(key)); - } - - @Override - public String getProperty(String key, String defaultValue) { - return StringUtils.trim(super.getProperty(key, defaultValue)); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/BuildProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/BuildProperties.java b/common/src/main/java/org/apache/falcon/util/BuildProperties.java deleted file mode 100644 index 339dcb5..0000000 --- a/common/src/main/java/org/apache/falcon/util/BuildProperties.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.falcon.FalconException; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Application build info properties are exposed through this. - */ -public final class BuildProperties extends ApplicationProperties { - private static final String PROPERTY_FILE = "falcon-buildinfo.properties"; - - private static final AtomicReference<BuildProperties> INSTANCE = - new AtomicReference<BuildProperties>(); - - private BuildProperties() throws FalconException { - super(); - } - - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } - - public static Properties get() { - try { - if (INSTANCE.get() == null) { - INSTANCE.compareAndSet(null, new BuildProperties()); - } - return INSTANCE.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " - + "falcon build information properties", e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java deleted file mode 100644 index baf5b13..0000000 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.util; - -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.Frequency; - -import java.util.Calendar; -import java.util.Date; -import java.util.TimeZone; - -/** - * Helper to get date operations. - */ -public final class DateUtil { - - private static final long MINUTE_IN_MS = 60 * 1000L; - private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; - private static final long DAY_IN_MS = 24 * HOUR_IN_MS; - private static final long MONTH_IN_MS = 31 * DAY_IN_MS; - - //Friday, April 16, 9999 7:12:55 AM UTC corresponding date - public static final Date NEVER = new Date(Long.parseLong("253379862775000")); - - public static final long HOUR_IN_MILLIS = 60 * 60 * 1000; - - private DateUtil() {} - - public static Date getNextMinute(Date time) throws Exception { - Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - insCal.setTime(time); - insCal.add(Calendar.MINUTE, 1); - return insCal.getTime(); - - } - - public static String getDateFormatFromTime(long milliSeconds) { - return SchemaHelper.getDateFormat().format((new Date(milliSeconds))); - } - - /** - * This function should not be used for scheduling related functions as it may cause correctness issues in those - * scenarios. - * @param frequency - * @return - */ - public static Long getFrequencyInMillis(Frequency frequency){ - switch (frequency.getTimeUnit()) { - - case months: - return MONTH_IN_MS * frequency.getFrequencyAsInt(); - - case days: - return DAY_IN_MS * frequency.getFrequencyAsInt(); - - case hours: - return HOUR_IN_MS * frequency.getFrequencyAsInt(); - - case minutes: - return MINUTE_IN_MS * frequency.getFrequencyAsInt(); - - default: - return null; - } - } - - /** - * Returns the current time, with seconds and milliseconds reset to 0. - * @return - */ - public static Date now() { - Calendar cal = Calendar.getInstance(); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - return cal.getTime(); - } - - /** - * Adds the supplied number of seconds to the given date and returns the new Date. - * @param date - * @param seconds - * @return - */ - public static Date offsetTime(Date date, int seconds) { - return new Date(1000L * seconds + date.getTime()); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java deleted file mode 100644 index 5879f30..0000000 --- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.falcon.FalconException; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Application deployment properties. particularly relating to - * whether the server is in embedded mode or distributed mode. - */ -public final class DeploymentProperties extends ApplicationProperties { - private static final String PROPERTY_FILE = "deploy.properties"; - - private static final AtomicReference<DeploymentProperties> INSTANCE = - new AtomicReference<>(); - - private DeploymentProperties() throws FalconException { - super(); - } - - @Override - protected String getPropertyFile() { - return PROPERTY_FILE; - } - - public static Properties get() { - try { - if (INSTANCE.get() == null) { - INSTANCE.compareAndSet(null, new DeploymentProperties()); - } - return INSTANCE.get(); - } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + "startup properties", e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java deleted file mode 100644 index 561520c..0000000 --- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.util; - -import org.apache.falcon.entity.ColoClusterRelation; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - -/** - * Helper methods to deployment properties. - */ -public final class DeploymentUtil { - private static final Logger LOG = LoggerFactory.getLogger(DeploymentUtil.class); - - protected static final String DEFAULT_COLO = "default"; - protected static final String EMBEDDED = "embedded"; - protected static final String DEPLOY_MODE = "deploy.mode"; - private static final Set<String> DEFAULT_ALL_COLOS = new HashSet<String>(); - - protected static final String CURRENT_COLO; - protected static final boolean EMBEDDED_MODE; - private static boolean prism = false; - - static { - DEFAULT_ALL_COLOS.add(DEFAULT_COLO); - EMBEDDED_MODE = DeploymentProperties.get(). - getProperty(DEPLOY_MODE, EMBEDDED).equals(EMBEDDED); - if (EMBEDDED_MODE) { - CURRENT_COLO = DEFAULT_COLO; - } else { - CURRENT_COLO = StartupProperties.get(). - getProperty("current.colo", DEFAULT_COLO); - } - LOG.info("Running in embedded mode? {}", EMBEDDED_MODE); - LOG.info("Current colo: {}", CURRENT_COLO); - } - - private DeploymentUtil() {} - - public static void setPrismMode() { - prism = true; - } - - public static boolean isPrism() { - return !EMBEDDED_MODE && prism; - } - - public static String getCurrentColo() { - return CURRENT_COLO; - } - - public static Set<String> getCurrentClusters() { - // return all clusters in embedded mode - if (EMBEDDED_MODE) { - Collection<String> allClusters = ConfigurationStore.get().getEntities(EntityType.CLUSTER); - Set<String> result = new HashSet<>(allClusters); - return result; - } - String colo = getCurrentColo(); - return ColoClusterRelation.get().getClusters(colo); - } - - public static boolean isEmbeddedMode() { - return EMBEDDED_MODE; - } - - public static String getDefaultColo() { - return DEFAULT_COLO; - } - - public static Set<String> getDefaultColos() { - DEFAULT_ALL_COLOS.add(DEFAULT_COLO); - return DEFAULT_ALL_COLOS; - } -}
