http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java deleted file mode 100644 index bbfca68..0000000 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.resource.SchedulableEntityInstance; - -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Helper methods for accessing process members. - */ -public final class ProcessHelper { - - private ProcessHelper() {} - - - public static Cluster getCluster(Process process, String clusterName) { - for (Cluster cluster : process.getClusters().getClusters()) { - if (cluster.getName().equals(clusterName)) { - return cluster; - } - } - return null; - } - - public static String getProcessWorkflowName(String workflowName, String processName) { - return StringUtils.isEmpty(workflowName) ? processName + "-workflow" : workflowName; - } - - public static Storage.TYPE getStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster, - Process process) throws FalconException { - Storage.TYPE storageType = Storage.TYPE.FILESYSTEM; - if (process.getInputs() == null && process.getOutputs() == null) { - return storageType; - } - - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed()); - storageType = FeedHelper.getStorageType(feed, cluster); - if (Storage.TYPE.TABLE == storageType) { - break; - } - } - } - - // If input feeds storage type is file system check storage type of output feeds - if (process.getOutputs() != null && Storage.TYPE.FILESYSTEM == storageType) { - for (Output output : process.getOutputs().getOutputs()) { - Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed()); - storageType = FeedHelper.getStorageType(feed, cluster); - if (Storage.TYPE.TABLE == storageType) { - break; - } - } - } - - return storageType; - } - - private static void validateProcessInstance(Process process, Date instanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster) { - //validate the cluster - Cluster processCluster = getCluster(process, cluster.getName()); - if (processCluster == null) { - throw new IllegalArgumentException("Cluster provided: " + cluster.getName() - + " is not a valid cluster for the process: " + process.getName()); - } - - // check if instanceTime is in validity range - if (instanceTime.before(processCluster.getValidity().getStart()) - || !instanceTime.before(processCluster.getValidity().getEnd())) { - throw new IllegalArgumentException("Instance time provided: " + instanceTime - + " is not in validity range of process: " + process.getName() - + "on cluster: " + cluster.getName()); - } - - // check instanceTime is valid on the basis of startTime and frequency - Date nextInstance = EntityUtil.getNextStartTime(processCluster.getValidity().getStart(), - process.getFrequency(), process.getTimezone(), instanceTime); - if (!nextInstance.equals(instanceTime)) { - throw new IllegalArgumentException("Instance time provided: " + instanceTime - + " for process: " + process.getName() + " is not a valid instance time on cluster: " - + cluster.getName() + " on the basis of startDate and frequency"); - } - } - - /** - * Given a process instance, returns the feed instances which are used as input for this process instance. - * - * @param process given process - * @param instanceTime nominal time of the process instance - * @param cluster - cluster for the process instance - * @param allowOptionalFeeds switch to indicate whether optional feeds should be considered in input feeds. - * @return Set of input feed instances which are consumed by the given process instance. - * @throws org.apache.falcon.FalconException - */ - public static Set<SchedulableEntityInstance> getInputFeedInstances(Process process, Date instanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster, boolean allowOptionalFeeds) throws FalconException { - - // validate the inputs - validateProcessInstance(process, instanceTime, cluster); - - Set<SchedulableEntityInstance> result = new HashSet<>(); - if (process.getInputs() != null) { - ConfigurationStore store = ConfigurationStore.get(); - for (Input i : process.getInputs().getInputs()) { - if (i.isOptional() && !allowOptionalFeeds) { - continue; - } - Feed feed = store.get(EntityType.FEED, i.getFeed()); - // inputStart is process instance time + (now - startTime) - ExpressionHelper evaluator = ExpressionHelper.get(); - ExpressionHelper.setReferenceDate(instanceTime); - Date inputInstanceStartDate = evaluator.evaluate(i.getStart(), Date.class); - Date inputInstanceEndDate = evaluator.evaluate(i.getEnd(), Date.class); - List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(), - inputInstanceStartDate, inputInstanceEndDate); - SchedulableEntityInstance instance; - for (Date time : instanceTimes) { - instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED); - instance.setTags(SchedulableEntityInstance.INPUT); - result.add(instance); - } - } - } - return result; - } - - public static Set<SchedulableEntityInstance> getOutputFeedInstances(Process process, Date instanceTime, - org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException { - Set<SchedulableEntityInstance> result = new HashSet<>(); - - // validate the inputs - validateProcessInstance(process, instanceTime, cluster); - - if (process.getOutputs() != null && process.getOutputs().getOutputs() != null) { - - ExpressionHelper.setReferenceDate(instanceTime); - ExpressionHelper evaluator = ExpressionHelper.get(); - SchedulableEntityInstance candidate; - ConfigurationStore store = ConfigurationStore.get(); - for (Output output : process.getOutputs().getOutputs()) { - - Date outputInstance = evaluator.evaluate(output.getInstance(), Date.class); - // find the feed - Feed feed = store.get(EntityType.FEED, output.getFeed()); - org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName()); - outputInstance = EntityUtil.getPreviousInstanceTime(fCluster.getValidity().getStart(), - feed.getFrequency(), feed.getTimezone(), outputInstance); - candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance, - EntityType.FEED); - candidate.setTags(SchedulableEntityInstance.OUTPUT); - result.add(candidate); - } - } - return result; - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/Storage.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java deleted file mode 100644 index 3dc8f67..0000000 --- a/common/src/main/java/org/apache/falcon/entity/Storage.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.Path; - -import java.util.Date; -import java.util.List; - -/** - * A class to encapsulate the storage for a given feed which can either be - * expressed as a path on the file system or a table in a catalog. - */ -public interface Storage extends Configurable { - - String DOLLAR_EXPR_START_REGEX = "\\$\\{"; - String QUESTION_EXPR_START_REGEX = "\\?\\{"; - String EXPR_CLOSE_REGEX = "\\}"; - - /** - * URI Friendly expression. - */ - String DOLLAR_EXPR_START_NORMALIZED = "_D__START_"; - String EXPR_CLOSE_NORMALIZED = "_CLOSE_"; - - /** - * Enumeration for the various storage types. - */ - enum TYPE {FILESYSTEM, TABLE} - - /** - * Return the type of storage. - * - * @return storage type - */ - TYPE getType(); - - /** - * Return the uri template. - * - * @return uri template - */ - String getUriTemplate(); - - /** - * Return the uri template for a given location type. - * - * @param locationType type of location, applies only to filesystem type - * @return uri template - */ - String getUriTemplate(LocationType locationType); - - /** - * Check for equality of this instance against the one in question. - * - * @param toCompareAgainst instance to compare - * @return true if identical else false - * @throws FalconException an exception - */ - boolean isIdentical(Storage toCompareAgainst) throws FalconException; - - /** - * Check the permission on the storage, regarding owner/group/permission coming from ACL. - * - * @param acl the ACL defined in the entity. - * @throws FalconException if the permissions are not valid. - */ - void validateACL(AccessControlList acl) throws FalconException; - - /** - * Get Feed Listing for a feed between a date range. - */ - List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType, - Date start, Date end) throws FalconException; - - - /** - * Checks the availability status for a given feed instance. - */ - FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName, - LocationType locationType, - Date instanceTime) throws FalconException; - - - /** - * Delete the instances of the feeds which are older than the retentionLimit specified. - * - * @param retentionLimit - retention limit of the feed e.g. hours(5). - * @param timeZone - timeZone for the feed definition. - * @param logFilePath - logFile to be used to record the deleted instances. - * @return - StringBuffer containing comma separated list of dates for the deleted instances. - * @throws FalconException - */ - StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java deleted file mode 100644 index c58be64..0000000 --- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.entity; - -import org.apache.falcon.Pair; -import org.apache.falcon.Tag; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; - -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Convenient builder for workflow name. - * @param <T> - */ -public class WorkflowNameBuilder<T extends Entity> { - private static final String PREFIX = "FALCON"; - - private T entity; - private Tag tag; - private List<String> suffixes; - - public WorkflowNameBuilder(T entity) { - this.entity = entity; - } - - public void setTag(Tag tag) { - this.tag = tag; - } - - public void setSuffixes(List<String> suffixes) { - this.suffixes = suffixes; - } - - public WorkflowName getWorkflowName() { - return new WorkflowName(PREFIX, entity.getEntityType().name(), - tag == null ? null : tag.name(), entity.getName(), - suffixes == null ? new ArrayList<String>() : suffixes); - } - - public Tag getWorkflowTag(String workflowName) { - return WorkflowName.getTagAndSuffixes(workflowName) == null ? null - : WorkflowName.getTagAndSuffixes(workflowName).first; - } - - public String getWorkflowSuffixes(String workflowName) { - return WorkflowName.getTagAndSuffixes(workflowName) == null ? "" - : WorkflowName.getTagAndSuffixes(workflowName).second; - } - - /** - * Workflow name. - */ - public static class WorkflowName { - private static final String SEPARATOR = "_"; - private static final Pattern WF_NAME_PATTERN; - - private String prefix; - private String entityType; - private String tag; - private String entityName; - private List<String> suffixes; - - static { - StringBuilder typePattern = new StringBuilder("("); - for (EntityType type : EntityType.values()) { - typePattern.append(type.name()); - typePattern.append("|"); - } - typePattern = typePattern.deleteCharAt(typePattern.length() - 1); - typePattern.append(")"); - StringBuilder tagsPattern = new StringBuilder("("); - for (Tag tag : Tag.values()) { - tagsPattern.append(tag.name()); - tagsPattern.append("|"); - } - tagsPattern = tagsPattern.deleteCharAt(tagsPattern.length() - 1); - tagsPattern.append(")"); - - String name = "([a-zA-Z][\\-a-zA-Z0-9]*)"; - - String suffix = "([_A-Za-z0-9-.]*)"; - - String namePattern = PREFIX + SEPARATOR + typePattern + SEPARATOR + tagsPattern - + SEPARATOR + name + suffix; - - WF_NAME_PATTERN = Pattern.compile(namePattern); - } - - public WorkflowName(String prefix, String entityType, String tag, - String entityName, List<String> suffixes) { - this.prefix = prefix; - this.entityType = entityType; - this.tag = tag; - this.entityName = entityName; - this.suffixes = suffixes; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append(prefix).append(SEPARATOR).append(entityType) - .append(tag == null ? "" : SEPARATOR + tag) - .append(SEPARATOR).append(entityName); - - for (String suffix : suffixes) { - builder.append(SEPARATOR).append(suffix); - } - - return builder.toString(); - } - - public static Pair<Tag, String> getTagAndSuffixes(String workflowName) { - Matcher matcher = WF_NAME_PATTERN.matcher(workflowName); - if (matcher.matches()) { - matcher.reset(); - if (matcher.find()) { - String tag = matcher.group(2); - String suffixes = matcher.group(4); - return new Pair<>(Tag.valueOf(tag), suffixes); - } - } - return null; - } - - public static Pair<String, EntityType> getEntityNameAndType(String workflowName) { - Matcher matcher = WF_NAME_PATTERN.matcher(workflowName); - if (matcher.matches()) { - matcher.reset(); - if (matcher.find()) { - String type = matcher.group(1); - String name = matcher.group(3); - return new Pair<>(name, EntityType.valueOf(type)); - } - } - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java deleted file mode 100644 index 51568fb..0000000 --- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.entity.common; - -import java.util.Calendar; -import java.util.regex.Pattern; - -/** - * Helper to map feed path and the time component. - */ -public final class FeedDataPath { - - private FeedDataPath() {} - - /** - * Standard variables for feed time components. - */ - public enum VARS { - YEAR("([0-9]{4})", Calendar.YEAR, 4), - MONTH("(0[1-9]|1[0-2])", Calendar.MONTH, 2), - DAY("(0[1-9]|1[0-9]|2[0-9]|3[0-1])", Calendar.DAY_OF_MONTH, 2), - HOUR("([0-1][0-9]|2[0-4])", Calendar.HOUR_OF_DAY, 2), - MINUTE("([0-5][0-9]|60)", Calendar.MINUTE, 2); - - private final Pattern pattern; - private final String valuePattern; - private final int calendarField; - private final int valueSize; - - private VARS(String patternRegularExpression, int calField, int numDigits) { - pattern = Pattern.compile("\\$\\{" + name() + "\\}"); - this.valuePattern = patternRegularExpression; - this.calendarField = calField; - this.valueSize = numDigits; - } - - public String getValuePattern() { - return valuePattern; - } - - public String regex() { - return pattern.pattern(); - } - - public int getCalendarField() { - return calendarField; - } - - public int getValueSize() { - return valueSize; - } - - public void setCalendar(Calendar cal, int value) { - if (this == MONTH) { - cal.set(calendarField, value - 1); - } else { - cal.set(calendarField, value); - } - } - - public static VARS from(String str) { - for (VARS var : VARS.values()) { - if (var.pattern.matcher(str).matches()) { - return var; - } - } - return null; - } - } - - public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex() - + "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|" - + VARS.HOUR.regex() + "|" + VARS.MINUTE.regex()); -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java b/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java deleted file mode 100644 index 0cf2722..0000000 --- a/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.entity.lock; - -import org.apache.falcon.entity.v0.Entity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * In memory resource locking that provides lock capabilities. - */ -public final class MemoryLocks { - private static final Logger LOG = LoggerFactory.getLogger(MemoryLocks.class); - private static ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<String, Boolean>(); - - private static MemoryLocks instance = new MemoryLocks(); - - private MemoryLocks() { - } - - public static MemoryLocks getInstance() { - return instance; - } - - /** - * Obtain a lock for an entity. - * - * @param entity entity object. - * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. - */ - public boolean acquireLock(Entity entity, String command) { - boolean lockObtained = false; - String entityName = getLockKey(entity); - - Boolean putResponse = locks.putIfAbsent(entityName, true); - if (putResponse == null || !putResponse) { - LOG.info("Lock acquired for {} on {} by {}", - command, entity.toShortString(), Thread.currentThread().getName()); - lockObtained = true; - } - return lockObtained; - } - - /** - * Release the lock for an entity. - * - * @param entity entity object. - */ - public void releaseLock(Entity entity) { - String entityName = getLockKey(entity); - - locks.remove(entityName); - LOG.info("Successfully released lock on {} by {}", - entity.toShortString(), Thread.currentThread().getName()); - } - - private String getLockKey(Entity entity) { - return entity.getEntityType().toString() + "." + entity.getName(); - } - - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java deleted file mode 100644 index bef4b39..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ /dev/null @@ -1,405 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.commons.lang.Validate; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.catalog.CatalogServiceFactory; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.ACL; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.ClusterLocationType; -import org.apache.falcon.entity.v0.cluster.Interface; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.cluster.Location; -import org.apache.falcon.entity.v0.cluster.Properties; -import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.security.SecurityUtil; -import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.workflow.WorkflowEngineFactory; -import org.apache.falcon.workflow.util.OozieConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.ConnectionFactory; -import java.io.IOException; -import java.net.URI; -import java.util.HashSet; -import java.util.List; - -/** - * Parser that parses cluster entity definition. - */ -public class ClusterEntityParser extends EntityParser<Cluster> { - - private static final Logger LOG = LoggerFactory.getLogger(ClusterEntityParser.class); - - public ClusterEntityParser() { - super(EntityType.CLUSTER); - } - - @Override - public void validate(Cluster cluster) throws ValidationException { - // validating scheme in light of fail-early - validateScheme(cluster, Interfacetype.READONLY); - validateScheme(cluster, Interfacetype.WRITE); - validateScheme(cluster, Interfacetype.WORKFLOW); - // User may choose to disable job completion notifications - if (ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING) != null) { - validateScheme(cluster, Interfacetype.MESSAGING); - } - if (CatalogServiceFactory.isEnabled() - && ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) { - validateScheme(cluster, Interfacetype.REGISTRY); - } - - validateACL(cluster); - - if (!EntityUtil.responsibleFor(cluster.getColo())) { - return; - } - - validateReadInterface(cluster); - validateWriteInterface(cluster); - validateExecuteInterface(cluster); - validateWorkflowInterface(cluster); - validateMessagingInterface(cluster); - validateRegistryInterface(cluster); - validateLocations(cluster); - validateProperties(cluster); - } - - private void validateScheme(Cluster cluster, Interfacetype interfacetype) - throws ValidationException { - final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint(); - URI uri = new Path(endpoint).toUri(); - if (uri.getScheme() == null) { - if (Interfacetype.WORKFLOW == interfacetype - && uri.toString().equals(OozieConstants.LOCAL_OOZIE)) { - return; - } - throw new ValidationException("Cannot get valid scheme for interface: " - + interfacetype + " of cluster: " + cluster.getName()); - } - } - - private void validateReadInterface(Cluster cluster) throws ValidationException { - final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster); - LOG.info("Validating read interface: {}", readOnlyStorageUrl); - - validateFileSystem(cluster, readOnlyStorageUrl); - } - - private void validateWriteInterface(Cluster cluster) throws ValidationException { - final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster); - LOG.info("Validating write interface: {}", writeStorageUrl); - - validateFileSystem(cluster, writeStorageUrl); - } - - private void validateFileSystem(Cluster cluster, String storageUrl) throws ValidationException { - try { - Configuration conf = new Configuration(); - conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl); - conf.setInt("ipc.client.connect.max.retries", 10); - - if (UserGroupInformation.isSecurityEnabled()) { - String nameNodePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.NN_PRINCIPAL); - Validate.notEmpty(nameNodePrincipal, - "Cluster definition missing required namenode credential property: " + SecurityUtil.NN_PRINCIPAL); - - conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal); - } - - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); - fs.exists(new Path("/")); - } catch (Exception e) { - throw new ValidationException("Invalid storage server or port: " + storageUrl - + ", " + e.getMessage(), e); - } - } - - private void validateExecuteInterface(Cluster cluster) throws ValidationException { - String executeUrl = ClusterHelper.getMREndPoint(cluster); - LOG.info("Validating execute interface: {}", executeUrl); - - try { - HadoopClientFactory.get().validateJobClient(executeUrl); - } catch (IOException e) { - throw new ValidationException("Invalid Execute server or port: " + executeUrl, e); - } - } - - protected void validateWorkflowInterface(Cluster cluster) throws ValidationException { - final String workflowUrl = ClusterHelper.getOozieUrl(cluster); - LOG.info("Validating workflow interface: {}", workflowUrl); - if (OozieConstants.LOCAL_OOZIE.equals(workflowUrl)) { - return; - } - try { - if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) { - throw new ValidationException("Unable to reach Workflow server:" + workflowUrl); - } - } catch (FalconException e) { - throw new ValidationException("Invalid Workflow server or port: " + workflowUrl, e); - } - } - - protected void validateMessagingInterface(Cluster cluster) throws ValidationException { - // Validate only if user has specified this - final Interface messagingInterface = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING); - if (messagingInterface == null) { - LOG.info("Messaging service is not enabled for cluster: {}", cluster.getName()); - return; - } - - final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster); - final String implementation = StartupProperties.get().getProperty("broker.impl.class", - "org.apache.activemq.ActiveMQConnectionFactory"); - LOG.info("Validating messaging interface: {}, implementation: {}", messagingUrl, implementation); - - try { - @SuppressWarnings("unchecked") - Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) - getClass().getClassLoader().loadClass(implementation); - ConnectionFactory connectionFactory = clazz.getConstructor( - String.class, String.class, String.class).newInstance("", "", messagingUrl); - connectionFactory.createConnection(); - } catch (Exception e) { - throw new ValidationException("Invalid Messaging server or port: " + messagingUrl - + " for: " + implementation, e); - } - } - - protected void validateRegistryInterface(Cluster cluster) throws ValidationException { - final boolean isCatalogRegistryEnabled = CatalogServiceFactory.isEnabled(); - if (!isCatalogRegistryEnabled) { - return; // ignore the registry interface for backwards compatibility - } - - // continue validation only if a catalog service is provided - final Interface catalogInterface = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); - if (catalogInterface == null) { - LOG.info("Catalog service is not enabled for cluster: {}", cluster.getName()); - return; - } - - final String catalogUrl = catalogInterface.getEndpoint(); - LOG.info("Validating catalog registry interface: {}", catalogUrl); - - try { - Configuration clusterConf = ClusterHelper.getConfiguration(cluster); - if (UserGroupInformation.isSecurityEnabled()) { - String metaStorePrincipal = clusterConf.get(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL); - Validate.notEmpty(metaStorePrincipal, - "Cluster definition missing required metastore credential property: " - + SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL); - } - - if (!CatalogServiceFactory.getCatalogService().isAlive(clusterConf, catalogUrl)) { - throw new ValidationException("Unable to reach Catalog server:" + catalogUrl); - } - } catch (FalconException e) { - throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e); - } - } - - /** - * Validate ACL if authorization is enabled. - * - * @param cluster cluster entity - * @throws ValidationException - */ - private void validateACL(Cluster cluster) throws ValidationException { - if (isAuthorizationDisabled) { - return; - } - - // Validate the entity owner is logged-in, authenticated user if authorization is enabled - final ACL clusterACL = cluster.getACL(); - if (clusterACL == null) { - throw new ValidationException("Cluster ACL cannot be empty for: " + cluster.getName()); - } - - validateACLOwnerAndGroup(clusterACL); - - try { - authorize(cluster.getName(), clusterACL); - } catch (AuthorizationException e) { - throw new ValidationException(e); - } - } - - /** - * Validate the locations on the cluster exists with appropriate permissions - * for the user to write to this directory. - * - * @param cluster cluster entity - * @throws ValidationException - */ - protected void validateLocations(Cluster cluster) throws ValidationException { - Configuration conf = ClusterHelper.getConfiguration(cluster); - FileSystem fs; - try { - fs = HadoopClientFactory.get().createFalconFileSystem(conf); - } catch (FalconException e) { - throw new ValidationException("Unable to get file system handle for cluster " + cluster.getName(), e); - } - - Location stagingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING); - if (stagingLocation == null) { - throw new ValidationException( - "Unable to find the mandatory location of name: " + ClusterLocationType.STAGING.value() - + " for cluster " + cluster.getName()); - } else { - checkPathOwnerAndPermission(cluster.getName(), stagingLocation.getPath(), fs, - HadoopClientFactory.ALL_PERMISSION); - if (!ClusterHelper.checkWorkingLocationExists(cluster)) { - //Creating location type of working in the sub dir of staging dir with perms 755. FALCON-910 - createWorkingDirUnderStaging(fs, cluster, stagingLocation); - } else { - Location workingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING); - if (stagingLocation.getPath().equals(workingLocation.getPath())) { - throw new ValidationException( - "Location with name: " + stagingLocation.getName().value() + " and " + workingLocation - .getName().value() + " cannot have same path: " + stagingLocation.getPath() - + " for cluster :" + cluster.getName()); - } else { - checkPathOwnerAndPermission(cluster.getName(), workingLocation.getPath(), fs, - HadoopClientFactory.READ_EXECUTE_PERMISSION); - } - } - // Create staging subdirs falcon/workflows/feed and falcon/workflows/process : Falcon-1647 - createStagingSubdirs(fs, cluster, stagingLocation, - "falcon/workflows/feed", HadoopClientFactory.ALL_PERMISSION); - createStagingSubdirs(fs, cluster, stagingLocation, - "falcon/workflows/process", HadoopClientFactory.ALL_PERMISSION); - } - } - - private void createWorkingDirUnderStaging(FileSystem fs, Cluster cluster, - Location stagingLocation) throws ValidationException { - Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR); - try { - if (!fs.exists(workingDirPath)) { //Checking if the staging dir has the working dir to be created - HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION); - } else { - if (fs.isDirectory(workingDirPath)) { - FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission(); - if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check - throw new ValidationException( - "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" - + stagingLocation.getPath() - + " when staging location not specified with " - + HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got " - + workingPerms.toString()); - } - } else { - throw new ValidationException( - "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:" - + stagingLocation.getPath() - + " when staging location not specified. Got a file at " + workingDirPath - .toString()); - } - } - } catch (IOException e) { - throw new ValidationException( - "Unable to create path for " + workingDirPath.toString() + " with path: " - + workingDirPath.toString() + " for cluster " + cluster.getName(), e); - } - } - - private void createStagingSubdirs(FileSystem fs, Cluster cluster, Location stagingLocation, - String path, FsPermission permission) throws ValidationException { - Path subdirPath = new Path(stagingLocation.getPath(), path); - try { - HadoopClientFactory.mkdirs(fs, subdirPath, permission); - } catch (IOException e) { - throw new ValidationException( - "Unable to create path " - + subdirPath.toString() + " for cluster " + cluster.getName(), e); - } - } - - protected void validateProperties(Cluster cluster) throws ValidationException { - Properties properties = cluster.getProperties(); - if (properties == null) { - return; // Cluster has no properties to validate. - } - - List<Property> propertyList = cluster.getProperties().getProperties(); - HashSet<String> propertyKeys = new HashSet<String>(); - for (Property prop : propertyList) { - if (StringUtils.isBlank(prop.getName())) { - throw new ValidationException("Property name and value cannot be empty for Cluster: " - + cluster.getName()); - } - if (!propertyKeys.add(prop.getName())) { - throw new ValidationException("Multiple properties with same name found for Cluster: " - + cluster.getName()); - } - } - } - - private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs, - FsPermission expectedPermission) throws ValidationException { - - Path locationPath = new Path(location); - try { - if (!fs.exists(locationPath)) { - throw new ValidationException("Location " + location + " for cluster " + clusterName + " must exist."); - } - - // falcon owns this path on each cluster - final String loginUser = UserGroupInformation.getLoginUser().getShortUserName(); - FileStatus fileStatus = fs.getFileStatus(locationPath); - final String locationOwner = fileStatus.getOwner(); - if (!locationOwner.equals(loginUser)) { - LOG.error("Owner of the location {} is {} for cluster {}. Current user {} is not the owner of the " - + "location.", locationPath, locationOwner, clusterName, loginUser); - throw new ValidationException("Path [" + locationPath + "] on the cluster [" + clusterName + "] has " - + "owner [" + locationOwner + "]. Current user [" + loginUser + "] is not the owner of the " - + "path"); - } - String errorMessage = "Path " + locationPath + " has permissions: " + fileStatus.getPermission().toString() - + ", should be " + expectedPermission; - if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) { - LOG.error(errorMessage); - throw new ValidationException(errorMessage); - } - // try to list to see if the user is able to write to this folder - fs.listStatus(locationPath); - } catch (IOException e) { - throw new ValidationException( - "Unable to validate the location with path: " + location + " for cluster:" + clusterName - + " due to transient failures ", e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java b/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java deleted file mode 100644 index 18ae754..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.ProcessHelper; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.entity.v0.process.Validity; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.util.DateUtil; - -import java.util.Date; - -/** - * Validation helper functions to validate across process, feed and cluster definitions. - */ -public final class CrossEntityValidations { - - private CrossEntityValidations() {} - - public static void validateInstanceRange(Process process, Input input, Feed feed) throws FalconException { - - try { - for (Cluster cluster : process.getClusters().getClusters()) { - String clusterName = cluster.getName(); - org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed, - clusterName).getValidity(); - - // Optinal end_date - if (feedValidity.getEnd() == null) { - feedValidity.setEnd(DateUtil.NEVER); - } - - Date feedStart = feedValidity.getStart(); - Date feedEnd = feedValidity.getEnd(); - - String instStartEL = input.getStart(); - String instEndEL = input.getEnd(); - ExpressionHelper evaluator = ExpressionHelper.get(); - - Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity(); - ExpressionHelper.setReferenceDate(processValidity.getStart()); - Date instStart = evaluator.evaluate(instStartEL, Date.class); - Date instEnd = evaluator.evaluate(instEndEL, Date.class); - if (instStart.before(feedStart)) { - throw new ValidationException("Start instance " + instStartEL + " of feed " + feed.getName() - + " is before the start of feed " + feedValidity.getStart() + " for cluster " - + clusterName); - } - - if (instEnd.before(instStart)) { - throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName() - + " is before the start instance " + instStartEL + " for cluster " + clusterName); - } - - if (instEnd.after(feedEnd)) { - throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName() - + " is after the end of feed " + feedValidity.getEnd() + " for cluster " + clusterName); - } - } - } catch (ValidationException e) { - throw e; - } catch (Exception e) { - throw new FalconException(e); - } - } - - public static void validateFeedRetentionPeriod(String startInstance, Feed feed, String clusterName) - throws FalconException { - - String feedRetention = FeedHelper.getCluster(feed, clusterName).getRetention().getLimit().toString(); - ExpressionHelper evaluator = ExpressionHelper.get(); - - Date now = new Date(); - ExpressionHelper.setReferenceDate(now); - Date instStart = evaluator.evaluate(startInstance, Date.class); - long feedDuration = evaluator.evaluate(feedRetention, Long.class); - Date feedStart = new Date(now.getTime() - feedDuration); - - if (instStart.before(feedStart)) { - throw new ValidationException("StartInstance :" + startInstance + " of process is out of range for Feed: " - + feed.getName() + " in cluster: " + clusterName + "'s retention limit :" + feedRetention); - } - } - - // Mapping to oozie coord's dataset fields - public static void validateInstance(Process process, Output output, Feed feed) throws FalconException { - - try { - for (Cluster cluster : process.getClusters().getClusters()) { - String clusterName = cluster.getName(); - org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed, - clusterName).getValidity(); - Date feedStart = feedValidity.getStart(); - Date feedEnd = feedValidity.getEnd(); - - String instEL = output.getInstance(); - ExpressionHelper evaluator = ExpressionHelper.get(); - Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity(); - ExpressionHelper.setReferenceDate(processValidity.getStart()); - Date inst = evaluator.evaluate(instEL, Date.class); - if (inst.before(feedStart)) { - throw new ValidationException("Instance " + instEL + " of feed " + feed.getName() - + " is before the start of feed " + feedValidity.getStart() + " for cluster" + clusterName); - } - - if (inst.after(feedEnd)) { - throw new ValidationException("End instance " + instEL + " for feed " + feed.getName() - + " is after the end of feed " + feedValidity.getEnd() + " for cluster" + clusterName); - } - } - } catch (ValidationException e) { - throw e; - } catch (Exception e) { - throw new FalconException(e); - } - } - - public static void validateInputPartition(Input input, Feed feed) throws ValidationException { - String[] parts = input.getPartition().split("/"); - if (feed.getPartitions() == null || feed.getPartitions().getPartitions().isEmpty() - || feed.getPartitions().getPartitions().size() < parts.length) { - throw new ValidationException("Partition specification in input " + input.getName() + " is wrong"); - } - } - - public static void validateFeedDefinedForCluster(Feed feed, String clusterName) throws FalconException { - if (FeedHelper.getCluster(feed, clusterName) == null) { - throw new ValidationException("Feed " + feed.getName() + " is not defined for cluster " + clusterName); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java deleted file mode 100644 index 998f952..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.DatasourceHelper; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.datasource.ACL; -import org.apache.falcon.entity.v0.datasource.Datasource; -import org.apache.falcon.entity.v0.datasource.Interfacetype; -import org.apache.falcon.util.HdfsClassLoader; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.sql.Connection; -import java.util.Arrays; -import java.util.Properties; - -/** - * Parser for DataSource entity definition. - */ - -public class DatasourceEntityParser extends EntityParser<Datasource> { - - private static final Logger LOG = LoggerFactory.getLogger(DatasourceEntityParser.class); - - public DatasourceEntityParser() { - super(EntityType.DATASOURCE); - } - - @Override - public void validate(Datasource db) throws FalconException { - try { - ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars()); - validateInterface(db, Interfacetype.READONLY, hdfsClassLoader); - validateInterface(db, Interfacetype.WRITE, hdfsClassLoader); - validateACL(db); - } catch(IOException io) { - throw new ValidationException("Unable to copy driver jars to local dir: " - + Arrays.toString(db.getDriver().getJars().toArray())); - } - } - - private static void validateInterface(Datasource db, Interfacetype interfacetype, ClassLoader hdfsClassLoader) - throws ValidationException { - String endpoint = null; - Properties userPasswdInfo = null; - try { - if (interfacetype == Interfacetype.READONLY) { - endpoint = DatasourceHelper.getReadOnlyEndpoint(db); - userPasswdInfo = DatasourceHelper.fetchReadPasswordInfo(db); - } else if (interfacetype == Interfacetype.WRITE) { - endpoint = DatasourceHelper.getWriteEndpoint(db); - userPasswdInfo = DatasourceHelper.fetchWritePasswordInfo(db); - } - if (StringUtils.isNotBlank(endpoint)) { - LOG.info("Validating {} endpoint {} connection.", interfacetype.value(), endpoint); - validateConnection(hdfsClassLoader, db.getDriver().getClazz(), endpoint, userPasswdInfo); - } - } catch(FalconException fe) { - throw new ValidationException(String.format("Cannot validate '%s' " - + "interface '%s' " + "of database entity '%s' due to '%s' ", - interfacetype, endpoint, - db.getName(), fe.getMessage())); - } - } - - private static void validateConnection(ClassLoader hdfsClassLoader, String driverClass, - String connectUrl, Properties userPasswdInfo) - throws FalconException { - ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader(); - LOG.info("Preserving current classloader: {}", previousClassLoader.toString()); - try { - Thread.currentThread().setContextClassLoader(hdfsClassLoader); - LOG.info("Setting context classloader to : {}", hdfsClassLoader.toString()); - java.sql.Driver driver = (java.sql.Driver) hdfsClassLoader.loadClass(driverClass).newInstance(); - LOG.info("Validating connection URL: {} using driver: {}", connectUrl, driver.getClass().toString()); - Connection con = driver.connect(connectUrl, userPasswdInfo); - if (con == null) { - throw new FalconException("DriverManager.getConnection() return " - + "null for URL : " + connectUrl); - } - } catch (Exception ex) { - LOG.error("Exception while validating connection : ", ex); - throw new FalconException(ex); - } finally { - Thread.currentThread().setContextClassLoader(previousClassLoader); - LOG.info("Restoring original classloader {}", previousClassLoader.toString()); - } - } - - /** - * Validate ACL if authorization is enabled. - * - * @param db database entity - * @throws ValidationException - */ - private void validateACL(Datasource db) throws ValidationException { - if (isAuthorizationDisabled) { - return; - } - - // Validate the entity owner is logged-in, authenticated user if authorization is enabled - final ACL dbACL = db.getACL(); - if (dbACL == null) { - throw new ValidationException("Datasource ACL cannot be empty for: " + db.getName()); - } - - validateACLOwnerAndGroup(dbACL); - - try { - authorize(db.getName(), dbACL); - } catch (AuthorizationException e) { - throw new ValidationException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java deleted file mode 100644 index 05b204d..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.commons.io.IOUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.xml.bind.Unmarshaller; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -/** - * Generic Abstract Entity Parser, the concrete FEED, PROCESS and CLUSTER should extend this parser - * to implement specific parsing. - * - * @param <T> of type Entity - */ -public abstract class EntityParser<T extends Entity> { - - private static final Logger LOG = LoggerFactory.getLogger(EntityParser.class); - - private final EntityType entityType; - protected final boolean isAuthorizationDisabled; - - protected EntityParser(EntityType entityType) { - this.entityType = entityType; - isAuthorizationDisabled = !SecurityUtil.isAuthorizationEnabled(); - } - - public EntityType getEntityType() { - return this.entityType; - } - - /** - * Parses a sent XML and validates it using JAXB. - * - * @param xmlString - Entity XML - * @return Entity - JAVA Object - * @throws FalconException - */ - public Entity parseAndValidate(String xmlString) throws FalconException { - InputStream inputStream = null; - try { - inputStream = new ByteArrayInputStream(xmlString.getBytes()); - return parseAndValidate(inputStream); - } finally { - IOUtils.closeQuietly(inputStream); - } - } - - /** - * Parses xml stream. - * - * @param xmlStream stream - * @return entity - * @throws FalconException - */ - @SuppressWarnings("unchecked") - public T parse(InputStream xmlStream) throws FalconException { - try { - // parse against schema - Unmarshaller unmarshaller = entityType.getUnmarshaller(); - T entity = (T) unmarshaller.unmarshal(xmlStream); - LOG.info("Parsed Entity: {}", entity.getName()); - return entity; - } catch (Exception e) { - throw new FalconException(e); - } - } - - public T parseAndValidate(InputStream xmlStream) throws FalconException { - T entity = parse(xmlStream); - validate(entity); - return entity; - } - - protected void validateEntityExists(EntityType type, String name) throws FalconException { - if (ConfigurationStore.get().get(type, name) == null) { - throw new ValidationException("Referenced " + type + " " + name + " is not registered"); - } - } - - public abstract void validate(T entity) throws FalconException; - - /** - * Checks if the acl owner is a valid user by fetching the groups for the owner. - * Also checks if the acl group is one of the fetched groups for membership. - * The only limitation is that a user cannot add a group in ACL that he does not belong to. - * - * @param acl entity ACL - * @throws org.apache.falcon.entity.parser.ValidationException - */ - protected void validateACLOwnerAndGroup(AccessControlList acl) throws ValidationException { - String aclOwner = acl.getOwner(); - String aclGroup = acl.getGroup(); - - try { - UserGroupInformation proxyACLUser = UserGroupInformation.createProxyUser( - aclOwner, UserGroupInformation.getLoginUser()); - Set<String> groups = new HashSet<String>(Arrays.asList(proxyACLUser.getGroupNames())); - if (!groups.contains(aclGroup)) { - throw new AuthorizationException("Invalid group: " + aclGroup - + " for user: " + aclOwner); - } - } catch (IOException e) { - throw new ValidationException("Invalid acl owner " + aclOwner - + ", does not exist or does not belong to group: " + aclGroup); - } - } - - /** - * Validate if the entity owner is the logged-in authenticated user. - * - * @param entityName entity name - * @param acl entity ACL - * @throws AuthorizationException - */ - protected void authorize(String entityName, - AccessControlList acl) throws AuthorizationException { - try { - SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName, - getEntityType().name(), acl, "submit", CurrentUser.getAuthenticatedUGI()); - } catch (FalconException e) { - throw new AuthorizationException(e); - } catch (IOException e) { - throw new AuthorizationException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java deleted file mode 100644 index b497770..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.falcon.entity.v0.EntityType; - -/** - * Factory Class which returns the Parser based on the EntityType. - */ -public final class EntityParserFactory { - - private EntityParserFactory() { - } - - /** - * Tie EnityType with the Entity Class in one place so that it can be - * unmarshalled easily by concrete classes based on the class type using - * JAXB. - * - * @param entityType - entity type - * @return concrete parser based on entity type - */ - public static EntityParser getParser(final EntityType entityType) { - - switch (entityType) { - case PROCESS: - return new ProcessEntityParser(); - case FEED: - return new FeedEntityParser(); - case CLUSTER: - return new ClusterEntityParser(); - case DATASOURCE: - return new DatasourceEntityParser(); - default: - throw new IllegalArgumentException("Unhandled entity type: " + entityType); - } - } - -}
