http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java deleted file mode 100644 index c942862..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ /dev/null @@ -1,719 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.catalog.CatalogServiceFactory; -import org.apache.falcon.entity.CatalogStorage; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.FileSystemStorage; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityGraph; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.ACL; -import org.apache.falcon.entity.v0.feed.Extract; -import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.MergeType; -import org.apache.falcon.entity.v0.feed.Properties; -import org.apache.falcon.entity.v0.feed.Property; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.group.FeedGroup; -import org.apache.falcon.group.FeedGroupMap; -import org.apache.falcon.service.LifecyclePolicyMap; -import org.apache.falcon.util.DateUtil; -import org.apache.falcon.util.HadoopQueueUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; - -/** - * Parser that parses feed entity definition. - */ -public class FeedEntityParser extends EntityParser<Feed> { - - private static final Logger LOG = LoggerFactory.getLogger(FeedEntityParser.class); - - public FeedEntityParser() { - super(EntityType.FEED); - } - - @Override - public void validate(Feed feed) throws FalconException { - if (feed.getTimezone() == null) { - feed.setTimezone(TimeZone.getTimeZone("UTC")); - } - - if (feed.getClusters() == null) { - throw new ValidationException("Feed should have at least one cluster"); - } - - validateLifecycle(feed); - validateACL(feed); - for (Cluster cluster : feed.getClusters().getClusters()) { - validateEntityExists(EntityType.CLUSTER, cluster.getName()); - - // Optinal end_date - if (cluster.getValidity().getEnd() == null) { - cluster.getValidity().setEnd(DateUtil.NEVER); - } - - validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(), - cluster.getName()); - validateClusterHasRegistry(feed, cluster); - validateFeedCutOffPeriod(feed, cluster); - if (FeedHelper.isImportEnabled(cluster)) { - validateEntityExists(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster)); - validateFeedExtractionType(feed, cluster); - validateFeedImportArgs(cluster); - validateFeedImportFieldExcludes(cluster); - } - if (FeedHelper.isExportEnabled(cluster)) { - validateEntityExists(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster)); - validateFeedExportArgs(cluster); - validateFeedExportFieldExcludes(cluster); - } - } - - validateFeedStorage(feed); - validateFeedPath(feed); - validateFeedPartitionExpression(feed); - validateFeedGroups(feed); - validateFeedSLA(feed); - validateProperties(feed); - validateHadoopQueue(feed); - - // Seems like a good enough entity object for a new one - // But is this an update ? - - Feed oldFeed = ConfigurationStore.get().get(EntityType.FEED, feed.getName()); - if (oldFeed == null) { - return; // Not an update case - } - - // Is actually an update. Need to iterate over all the processes - // depending on this feed and see if they are valid with the new - // feed reference - EntityGraph graph = EntityGraph.get(); - Set<Entity> referenced = graph.getDependents(oldFeed); - Set<Process> processes = findProcesses(referenced); - if (processes.isEmpty()) { - return; - } - - ensureValidityFor(feed, processes); - } - - private void validateLifecycle(Feed feed) throws FalconException { - LifecyclePolicyMap map = LifecyclePolicyMap.get(); - for (Cluster cluster : feed.getClusters().getClusters()) { - if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) { - if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) { - throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: " - + cluster.getName()); - } - validateRetentionFrequency(feed, cluster.getName()); - for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) { - map.get(policyName).validate(feed, cluster.getName()); - } - } - } - } - - private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException { - Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName); - Frequency feedFrequency = feed.getFrequency(); - if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) { - throw new ValidationException("Retention can not be more frequent than data availability."); - } - } - - private Set<Process> findProcesses(Set<Entity> referenced) { - Set<Process> processes = new HashSet<Process>(); - for (Entity entity : referenced) { - if (entity.getEntityType() == EntityType.PROCESS) { - processes.add((Process) entity); - } - } - return processes; - } - - private void validateFeedSLA(Feed feed) throws FalconException { - for (Cluster cluster : feed.getClusters().getClusters()) { - Sla clusterSla = FeedHelper.getSLA(cluster, feed); - if (clusterSla != null) { - Frequency slaLowExpression = clusterSla.getSlaLow(); - ExpressionHelper evaluator = ExpressionHelper.get(); - ExpressionHelper.setReferenceDate(new Date()); - Date slaLow = new Date(evaluator.evaluate(slaLowExpression.toString(), Long.class)); - - Frequency slaHighExpression = clusterSla.getSlaHigh(); - Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class)); - - if (slaLow.after(slaHigh)) { - throw new ValidationException("slaLow of Feed: " + slaLowExpression - + "is greater than slaHigh: " + slaHighExpression - + " for cluster: " + cluster.getName() - ); - } - - // test that slaHigh is less than retention - Frequency retentionExpression = cluster.getRetention().getLimit(); - Date retention = new Date(evaluator.evaluate(retentionExpression.toString(), Long.class)); - if (slaHigh.after(retention)) { - throw new ValidationException("slaHigh of Feed: " + slaHighExpression - + " is greater than retention of the feed: " + retentionExpression - + " for cluster: " + cluster.getName() - ); - } - - - } - } - } - - private void validateFeedGroups(Feed feed) throws FalconException { - String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{}; - final Storage storage = FeedHelper.createStorage(feed); - String defaultPath = storage.getUriTemplate(LocationType.DATA); - for (Cluster cluster : feed.getClusters().getClusters()) { - final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA); - if (!FeedGroup.getDatePattern(uriTemplate).equals( - FeedGroup.getDatePattern(defaultPath))) { - throw new ValidationException("Feeds default path pattern: " - + storage.getUriTemplate(LocationType.DATA) - + ", does not match with cluster: " - + cluster.getName() - + " path pattern: " - + uriTemplate); - } - } - for (String groupName : groupNames) { - FeedGroup group = FeedGroupMap.get().getGroupsMapping().get(groupName); - if (group != null && !group.canContainFeed(feed)) { - throw new ValidationException( - "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString() - + ", path pattern: " + storage - + " does not match with group: " + group.getName() + "'s frequency: " - + group.getFrequency() - + ", date pattern: " + group.getDatePattern()); - } - } - } - - private void ensureValidityFor(Feed newFeed, Set<Process> processes) throws FalconException { - for (Process process : processes) { - try { - ensureValidityFor(newFeed, process); - } catch (FalconException e) { - throw new ValidationException( - "Process " + process.getName() + " is not compatible " + "with changes to feed " - + newFeed.getName(), e); - } - } - } - - private void ensureValidityFor(Feed newFeed, Process process) throws FalconException { - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - String clusterName = cluster.getName(); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - if (!input.getFeed().equals(newFeed.getName())) { - continue; - } - CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName); - CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName); - CrossEntityValidations.validateInstanceRange(process, input, newFeed); - - validateInputPartition(newFeed, input); - } - } - - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - if (!output.getFeed().equals(newFeed.getName())) { - continue; - } - CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName); - CrossEntityValidations.validateInstance(process, output, newFeed); - } - } - LOG.debug("Verified and found {} to be valid for new definition of {}", - process.getName(), newFeed.getName()); - } - } - - private void validateInputPartition(Feed newFeed, Input input) throws FalconException { - if (input.getPartition() == null) { - return; - } - - final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed); - if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) { - CrossEntityValidations.validateInputPartition(input, newFeed); - } else if (baseFeedStorageType == Storage.TYPE.TABLE) { - throw new ValidationException("Input partitions are not supported for table storage: " + input.getName()); - } - } - - private void validateClusterValidity(Date start, Date end, String clusterName) throws FalconException { - try { - if (start.after(end)) { - throw new ValidationException("Feed start time: " + start + " cannot be after feed end time: " + end - + " for cluster: " + clusterName); - } - } catch (ValidationException e) { - throw new ValidationException(e); - } catch (Exception e) { - throw new FalconException(e); - } - } - - private void validateFeedCutOffPeriod(Feed feed, Cluster cluster) throws FalconException { - ExpressionHelper evaluator = ExpressionHelper.get(); - - String feedRetention = cluster.getRetention().getLimit().toString(); - long retentionPeriod = evaluator.evaluate(feedRetention, Long.class); - - if (feed.getLateArrival() == null) { - LOG.debug("Feed's late arrival cut-off not set"); - return; - } - String feedCutoff = feed.getLateArrival().getCutOff().toString(); - long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class); - - if (retentionPeriod < feedCutOffPeriod) { - throw new ValidationException( - "Feed's retention limit: " + feedRetention + " of referenced cluster " + cluster.getName() - + " should be more than feed's late arrival cut-off period: " + feedCutoff + " for feed: " - + feed.getName()); - } - } - - private void validateFeedPartitionExpression(Feed feed) throws FalconException { - int numSourceClusters = 0, numTrgClusters = 0; - Set<String> clusters = new HashSet<String>(); - for (Cluster cl : feed.getClusters().getClusters()) { - if (!clusters.add(cl.getName())) { - throw new ValidationException("Cluster: " + cl.getName() - + " is defined more than once for feed: " + feed.getName()); - } - if (cl.getType() == ClusterType.SOURCE) { - numSourceClusters++; - } else if (cl.getType() == ClusterType.TARGET) { - numTrgClusters++; - } - } - - if (numTrgClusters >= 1 && numSourceClusters == 0) { - throw new ValidationException("Feed: " + feed.getName() - + " should have atleast one source cluster defined"); - } - - int feedParts = feed.getPartitions() != null ? feed.getPartitions().getPartitions().size() : 0; - - for (Cluster cluster : feed.getClusters().getClusters()) { - - if (cluster.getType() == ClusterType.SOURCE && numSourceClusters > 1 && numTrgClusters >= 1) { - String part = FeedHelper.normalizePartitionExpression(cluster.getPartition()); - if (StringUtils.split(part, '/').length == 0) { - throw new ValidationException( - "Partition expression has to be specified for cluster " + cluster.getName() - + " as there are more than one source clusters"); - } - validateClusterExpDefined(cluster); - - } else if (cluster.getType() == ClusterType.TARGET) { - - for (Cluster src : feed.getClusters().getClusters()) { - if (src.getType() == ClusterType.SOURCE) { - String part = FeedHelper.normalizePartitionExpression(src.getPartition(), - cluster.getPartition()); - int numParts = StringUtils.split(part, '/').length; - if (numParts > feedParts) { - throw new ValidationException( - "Partition for " + src.getName() + " and " + cluster.getName() - + "clusters is more than the number of partitions defined in feed"); - } - } - } - - if (numTrgClusters > 1 && numSourceClusters >= 1) { - validateClusterExpDefined(cluster); - } - } - } - } - - private void validateClusterExpDefined(Cluster cl) throws FalconException { - if (cl.getPartition() == null) { - return; - } - - org.apache.falcon.entity.v0.cluster.Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, cl.getName()); - String part = FeedHelper.normalizePartitionExpression(cl.getPartition()); - if (FeedHelper.evaluateClusterExp(cluster, part).equals(part)) { - throw new ValidationException( - "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName()); - } - } - - /** - * Ensure table is already defined in the catalog registry. - * Does not matter for FileSystem storage. - */ - private void validateFeedStorage(Feed feed) throws FalconException { - final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed); - validateMultipleSourcesExist(feed, baseFeedStorageType); - validateUniformStorageType(feed, baseFeedStorageType); - validatePartitions(feed, baseFeedStorageType); - validateStorageExists(feed); - } - - private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType) throws FalconException { - if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) { - return; - } - - // validate that there is only one source cluster - int numberOfSourceClusters = 0; - for (Cluster cluster : feed.getClusters().getClusters()) { - if (cluster.getType() == ClusterType.SOURCE) { - numberOfSourceClusters++; - } - } - - if (numberOfSourceClusters > 1) { - throw new ValidationException("Multiple sources are not supported for feed with table storage: " - + feed.getName()); - } - } - - private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException { - for (Cluster cluster : feed.getClusters().getClusters()) { - Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster); - - if (feedStorageType != feedClusterStorageType) { - throw new ValidationException("The storage type is not uniform for cluster: " + cluster.getName()); - } - } - } - - private void validateClusterHasRegistry(Feed feed, Cluster cluster) throws FalconException { - Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster); - - if (feedClusterStorageType != Storage.TYPE.TABLE) { - return; - } - - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = EntityUtil.getEntity(EntityType.CLUSTER, - cluster.getName()); - if (ClusterHelper.getRegistryEndPoint(clusterEntity) == null) { - throw new ValidationException("Cluster should have registry interface defined: " + clusterEntity.getName()); - } - } - - private void validatePartitions(Feed feed, Storage.TYPE storageType) throws FalconException { - if (storageType == Storage.TYPE.TABLE && feed.getPartitions() != null) { - throw new ValidationException("Partitions are not supported for feeds with table storage. " - + "It should be defined as part of the table URI. " - + feed.getName()); - } - } - - private void validateStorageExists(Feed feed) throws FalconException { - StringBuilder buffer = new StringBuilder(); - for (Cluster cluster : feed.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = - EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); - if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { - continue; - } - - final Storage storage = FeedHelper.createStorage(cluster, feed); - // this is only true for table, filesystem always returns true - if (storage.getType() == Storage.TYPE.FILESYSTEM) { - continue; - } - - CatalogStorage catalogStorage = (CatalogStorage) storage; - Configuration clusterConf = ClusterHelper.getConfiguration(clusterEntity); - if (!CatalogServiceFactory.getCatalogService().tableExists( - clusterConf, catalogStorage.getCatalogUrl(), - catalogStorage.getDatabase(), catalogStorage.getTable())) { - buffer.append("Table [") - .append(catalogStorage.getTable()) - .append("] does not exist for feed: ") - .append(feed.getName()) - .append(" in cluster: ") - .append(cluster.getName()); - } - } - - if (buffer.length() > 0) { - throw new ValidationException(buffer.toString()); - } - } - - /** - * Validate ACL if authorization is enabled. - * - * @param feed Feed entity - * @throws ValidationException - */ - protected void validateACL(Feed feed) throws FalconException { - if (isAuthorizationDisabled) { - return; - } - - final ACL feedACL = feed.getACL(); - validateACLOwnerAndGroup(feedACL); - try { - authorize(feed.getName(), feedACL); - } catch (AuthorizationException e) { - throw new ValidationException(e); - } - - for (Cluster cluster : feed.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = - EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); - if (!EntityUtil.responsibleFor(clusterEntity.getColo())) { - continue; - } - - final Storage storage = FeedHelper.createStorage(cluster, feed); - try { - storage.validateACL(feedACL); - } catch(FalconException e) { - throw new ValidationException(e); - } - } - } - - /** - * Validate Hadoop cluster queue names specified in the Feed entity defintion. - * - * First tries to look for queue name specified in the Lifecycle, next queueName property - * and checks its validity against the Hadoop cluster scheduler info. - * - * Hadoop cluster queue is validated only if YARN RM webaddress is specified in the - * cluster entity properties. - * - * Throws exception if the specified queue name is not a valid hadoop cluster queue. - * - * @param feed - * @throws FalconException - */ - - protected void validateHadoopQueue(Feed feed) throws FalconException { - for (Cluster cluster : feed.getClusters().getClusters()) { - Set<String> feedQueue = getQueueNamesUsedInFeed(feed, cluster); - - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = - EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName()); - - String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address"); - if (StringUtils.isBlank(rmURL)) { - rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address"); - } - - if (StringUtils.isNotBlank(rmURL)) { - LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL); - Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL); - - for (String q: feedQueue) { - if (queueNames.contains(q)) { - LOG.info("Validated presence of retention queue specified in feed - {}", q); - } else { - String strMsg = String.format("The hadoop queue name %s specified " - + "for cluster %s is invalid.", q, cluster.getName()); - LOG.info(strMsg); - throw new FalconException(strMsg); - } - } - } - } - } - - protected Set<String> getQueueNamesUsedInFeed(Feed feed, Cluster cluster) throws FalconException { - Set<String> queueList = new HashSet<>(); - addToQueueList(FeedHelper.getRetentionQueue(feed, cluster), queueList); - if (cluster.getType() == ClusterType.TARGET) { - addToQueueList(FeedHelper.getReplicationQueue(feed, cluster), queueList); - } - return queueList; - } - - private void addToQueueList(String queueName, Set<String> queueList) { - if (StringUtils.isBlank(queueName)) { - queueList.add(queueName); - } - } - - protected void validateProperties(Feed feed) throws ValidationException { - Properties properties = feed.getProperties(); - if (properties == null) { - return; // feed has no properties to validate. - } - - List<Property> propertyList = feed.getProperties().getProperties(); - HashSet<String> propertyKeys = new HashSet<String>(); - for (Property prop : propertyList) { - if (StringUtils.isBlank(prop.getName())) { - throw new ValidationException("Property name and value cannot be empty for Feed : " - + feed.getName()); - } - if (!propertyKeys.add(prop.getName())) { - throw new ValidationException("Multiple properties with same name found for Feed : " - + feed.getName()); - } - } - } - - /** - * Validate if FileSystem based feed contains location type data. - * - * @param feed Feed entity - * @throws FalconException - */ - private void validateFeedPath(Feed feed) throws FalconException { - if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) { - return; - } - - for (Cluster cluster : feed.getClusters().getClusters()) { - List<Location> locations = FeedHelper.getLocations(cluster, feed); - Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); - - if (dataLocation == null) { - throw new ValidationException(feed.getName() + " is a FileSystem based feed " - + "but it doesn't contain location type - data in cluster " + cluster.getName()); - } - - } - } - - /** - * Validate extraction and merge type combination. Currently supported combo: - * - * ExtractionType = FULL and MergeType = SNAPSHOT. - * ExtractionType = INCREMENTAL and MergeType = APPEND. - * - * @param feed Feed entity - * @param cluster Cluster referenced in the Feed definition - * @throws FalconException - */ - - private void validateFeedExtractionType(Feed feed, Cluster cluster) throws FalconException { - Extract extract = cluster.getImport().getSource().getExtract(); - - if (ExtractMethod.FULL == extract.getType()) { - if ((MergeType.SNAPSHOT != extract.getMergepolicy()) - || (extract.getDeltacolumn() != null)) { - throw new ValidationException(String.format("Feed %s is using FULL " - + "extract method but specifies either a superfluous " - + "deltacolumn or a mergepolicy other than snapshot", feed.getName())); - } - } else { - throw new ValidationException(String.format("Feed %s is using unsupported " - + "extraction mechanism %s", feed.getName(), extract.getType().value())); - } - } - - /** - * Validate improt arguments. - * @param feedCluster Cluster referenced in the feed - */ - private void validateFeedImportArgs(Cluster feedCluster) throws FalconException { - Map<String, String> args = FeedHelper.getImportArguments(feedCluster); - validateSqoopArgs(args); - } - - /** - * Validate sqoop arguments. - * @param args Map<String, String> arguments - */ - private void validateSqoopArgs(Map<String, String> args) throws FalconException { - int numMappers = 1; - if (args.containsKey("--num-mappers")) { - numMappers = Integer.parseInt(args.get("--num-mappers")); - } - if ((numMappers > 1) && (!args.containsKey("--split-by"))) { - throw new ValidationException(String.format("Feed import expects " - + "--split-by column when --num-mappers > 1")); - } - } - - private void validateFeedImportFieldExcludes(Cluster feedCluster) throws FalconException { - if (FeedHelper.isFieldExcludes(feedCluster.getImport().getSource())) { - throw new ValidationException(String.format("Field excludes are not supported " - + "currently in Feed import policy")); - } - } - - /** - * Validate export arguments. - * @param feedCluster Cluster referenced in the feed - */ - private void validateFeedExportArgs(Cluster feedCluster) throws FalconException { - Map<String, String> args = FeedHelper.getExportArguments(feedCluster); - Map<String, String> validArgs = new HashMap<>(); - validArgs.put("--num-mappers", ""); - validArgs.put("--update-key" , ""); - validArgs.put("--input-null-string", ""); - validArgs.put("--input-null-non-string", ""); - - for(Map.Entry<String, String> e : args.entrySet()) { - if (!validArgs.containsKey(e.getKey())) { - throw new ValidationException(String.format("Feed export argument %s is invalid.", e.getKey())); - } - } - } - - private void validateFeedExportFieldExcludes(Cluster feedCluster) throws FalconException { - if (FeedHelper.isFieldExcludes(feedCluster.getExport().getTarget())) { - throw new ValidationException(String.format("Field excludes are not supported " - + "currently in Feed import policy")); - } - } - -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java deleted file mode 100644 index 16fd8b3..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.process.Properties; -import org.apache.falcon.entity.v0.process.Property; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.ACL; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.LateInput; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.util.DateUtil; -import org.apache.falcon.util.HadoopQueueUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; - -/** - * Concrete Parser which has XML parsing and validation logic for Process XML. - */ -public class ProcessEntityParser extends EntityParser<Process> { - - private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class); - - public ProcessEntityParser() { - super(EntityType.PROCESS); - } - - @Override - public void validate(Process process) throws FalconException { - if (process.getTimezone() == null) { - process.setTimezone(TimeZone.getTimeZone("UTC")); - } - - validateACL(process); - // check if dependent entities exists - Set<String> clusters = new HashSet<String>(); - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - String clusterName = cluster.getName(); - if (!clusters.add(cluster.getName())) { - throw new ValidationException("Cluster: " + cluster.getName() - + " is defined more than once for process: " + process.getName()); - } - validateEntityExists(EntityType.CLUSTER, clusterName); - - // Optinal end_date - if (cluster.getValidity().getEnd() == null) { - cluster.getValidity().setEnd(DateUtil.NEVER); - } - - validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd()); - validateHDFSPaths(process, clusterName); - validateProperties(process); - - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - validateEntityExists(EntityType.FEED, input.getFeed()); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); - CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); - CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName); - CrossEntityValidations.validateInstanceRange(process, input, feed); - validateInputPartition(input, feed); - validateOptionalInputsForTableStorage(feed, input); - } - } - - if (process.getOutputs() != null) { - for (Output output : process.getOutputs().getOutputs()) { - validateEntityExists(EntityType.FEED, output.getFeed()); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); - CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName); - CrossEntityValidations.validateInstance(process, output, feed); - } - } - } - validateDatasetName(process.getInputs(), process.getOutputs()); - validateLateInputs(process); - validateProcessSLA(process); - validateHadoopQueue(process); - } - - - private void validateProcessSLA(Process process) throws FalconException { - if (process.getSla() != null) { - ExpressionHelper evaluator = ExpressionHelper.get(); - ExpressionHelper.setReferenceDate(new Date()); - Frequency shouldStartExpression = process.getSla().getShouldStartIn(); - Frequency shouldEndExpression = process.getSla().getShouldEndIn(); - Frequency timeoutExpression = process.getTimeout(); - - if (shouldStartExpression != null){ - Date shouldStart = new Date(evaluator.evaluate(shouldStartExpression.toString(), Long.class)); - - if (shouldEndExpression != null) { - Date shouldEnd = new Date(evaluator.evaluate(shouldEndExpression.toString(), Long.class)); - if (shouldStart.after(shouldEnd)) { - throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression - + "is greater than shouldEndIn: " - + shouldEndExpression); - } - } - - if (timeoutExpression != null) { - Date timeout = new Date(evaluator.evaluate(timeoutExpression.toString(), Long.class)); - if (timeout.before(shouldStart)) { - throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression - + " is greater than timeout: " + process.getTimeout()); - } - } - } - } - } - /** - * Validate if the user submitting this entity has access to the specific dirs on HDFS. - * - * @param process process - * @param clusterName cluster the process is materialized on - * @throws FalconException - */ - private void validateHDFSPaths(Process process, String clusterName) throws FalconException { - org.apache.falcon.entity.v0.cluster.Cluster cluster = - ConfigurationStore.get().get(EntityType.CLUSTER, clusterName); - - if (!EntityUtil.responsibleFor(cluster.getColo())) { - return; - } - - String workflowPath = process.getWorkflow().getPath(); - String libPath = process.getWorkflow().getLib(); - String nameNode = getNameNode(cluster); - try { - Configuration configuration = ClusterHelper.getConfiguration(cluster); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration); - if (!fs.exists(new Path(workflowPath))) { - throw new ValidationException( - "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode); - } - - if (StringUtils.isNotBlank(libPath)) { - String[] libPaths = libPath.split(EntityUtil.WF_LIB_SEPARATOR); - for (String path : libPaths) { - if (!fs.exists(new Path(path))) { - throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode); - } - } - } - } catch (IOException e) { - throw new FalconException("Error validating workflow path " + workflowPath, e); - } - } - - private String getNameNode(Cluster cluster) throws ValidationException { - // cluster should never be null as it is validated while submitting feeds. - if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) { - throw new ValidationException( - "Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName()); - } - return ClusterHelper.getStorageUrl(cluster); - } - - private void validateProcessValidity(Date start, Date end) throws FalconException { - try { - if (!start.before(end)) { - throw new ValidationException( - "Process start time: " + start + " should be before process end time: " + end); - } - } catch (ValidationException e) { - throw new ValidationException(e); - } catch (Exception e) { - throw new FalconException(e); - } - } - - private void validateInputPartition(Input input, Feed feed) throws FalconException { - if (input.getPartition() == null) { - return; - } - - final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed); - if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) { - CrossEntityValidations.validateInputPartition(input, feed); - } else if (baseFeedStorageType == Storage.TYPE.TABLE) { - throw new ValidationException("Input partitions are not supported for table storage: " + input.getName()); - } - } - - private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException { - Set<String> datasetNames = new HashSet<String>(); - if (inputs != null) { - for (Input input : inputs.getInputs()) { - if (!datasetNames.add(input.getName())) { - throw new ValidationException("Input name: " + input.getName() + " is already used"); - } - } - } - - if (outputs != null) { - for (Output output : outputs.getOutputs()) { - if (!datasetNames.add(output.getName())) { - throw new ValidationException("Output name: " + output.getName() + " is already used"); - } - } - } - } - - private void validateLateInputs(Process process) throws ValidationException { - if (process.getLateProcess() == null) { - return; - } - - Map<String, String> feeds = new HashMap<String, String>(); - if (process.getInputs() != null) { - for (Input in : process.getInputs().getInputs()) { - feeds.put(in.getName(), in.getFeed()); - } - } - - for (LateInput lp : process.getLateProcess().getLateInputs()) { - if (!feeds.keySet().contains(lp.getInput())) { - throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs"); - } - - try { - Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput())); - if (feed.getLateArrival() == null) { - throw new ValidationException( - "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off"); - } - } catch (FalconException e) { - throw new ValidationException(e); - } - } - } - - private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException { - if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) { - throw new ValidationException("Optional Input is not supported for feeds with table storage! " - + input.getName()); - } - } - - /** - * Validate ACL if authorization is enabled. - * - * @param process process entity - * @throws ValidationException - */ - protected void validateACL(Process process) throws FalconException { - if (isAuthorizationDisabled) { - return; - } - - // Validate the entity owner is logged-in, authenticated user if authorization is enabled - ACL processACL = process.getACL(); - if (processACL == null) { - throw new ValidationException("Process ACL cannot be empty for: " + process.getName()); - } - - validateACLOwnerAndGroup(processACL); - - try { - authorize(process.getName(), processACL); - } catch (AuthorizationException e) { - throw new ValidationException(e); - } - } - - protected void validateProperties(Process process) throws ValidationException { - Properties properties = process.getProperties(); - if (properties == null) { - return; // Cluster has no properties to validate. - } - - List<Property> propertyList = process.getProperties().getProperties(); - HashSet<String> propertyKeys = new HashSet<String>(); - for (Property prop : propertyList) { - if (StringUtils.isBlank(prop.getName())) { - throw new ValidationException("Property name and value cannot be empty for Process : " - + process.getName()); - } - if (!propertyKeys.add(prop.getName())) { - throw new ValidationException("Multiple properties with same name found for Process : " - + process.getName()); - } - } - } - - private void validateHadoopQueue(Process process) throws FalconException { - // get queue name specified in the process entity - String processQueueName = null; - java.util.Properties props = EntityUtil.getEntityProperties(process); - if ((props != null) && (props.containsKey(EntityUtil.MR_QUEUE_NAME))) { - processQueueName = props.getProperty(EntityUtil.MR_QUEUE_NAME); - } else { - return; - } - - // iterate through each cluster in process entity to check if the cluster has the process entity queue - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - String clusterName = cluster.getName(); - org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = - ConfigurationStore.get().get(EntityType.CLUSTER, clusterName); - - String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address"); - if (rmURL == null) { - rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address"); - } - - if (rmURL != null) { - LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL); - Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL); - - if (queueNames.contains(processQueueName)) { - LOG.info("Validated presence of queue {} specified in process " - + "entity for cluster {}", processQueueName, clusterName); - } else { - String strMsg = String.format("The hadoop queue name %s specified in process " - + "entity for cluster %s is invalid.", processQueueName, cluster.getName()); - LOG.info(strMsg); - throw new FalconException(strMsg); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java deleted file mode 100644 index 98f1cb9..0000000 --- a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.falcon.FalconException; - -/** - * ValidationException during parsing. - */ -public class ValidationException extends FalconException { - - public ValidationException(String message) { - super(message); - } - - public ValidationException(Exception e) { - super(e); - } - - public ValidationException(String message, Exception e) { - super(message, e); - } - - private static final long serialVersionUID = -4502166408759507355L; - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java deleted file mode 100644 index 9c7a932..0000000 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ /dev/null @@ -1,435 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.commons.codec.CharEncoding; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.service.ConfigurationChangeListener; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * Persistent store for falcon entities. - */ -public final class ConfigurationStore implements FalconService { - - private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] { - EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE, }; - public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED, - EntityType.CLUSTER, }; - - private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class); - private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); - private static final String UTF_8 = CharEncoding.UTF_8; - private final boolean shouldPersist; - - private static final FsPermission STORE_PERMISSION = - new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); - - private Set<ConfigurationChangeListener> listeners = new LinkedHashSet<ConfigurationChangeListener>(); - - private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<Entity>(); - - private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary - = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>(); - - private static final Entity NULL = new Entity() { - @Override - public String getName() { - return "NULL"; - } - - @Override - public String getTags() { return null; } - - @Override - public AccessControlList getACL() { - return null; - } - }; - - private static final ConfigurationStore STORE = new ConfigurationStore(); - - public static ConfigurationStore get() { - return STORE; - } - - private FileSystem fs; - private Path storePath; - - private ConfigurationStore() { - for (EntityType type : EntityType.values()) { - dictionary.put(type, new ConcurrentHashMap<String, Entity>()); - } - - shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true")); - if (shouldPersist) { - String uri = StartupProperties.get().getProperty("config.store.uri"); - storePath = new Path(uri); - fs = initializeFileSystem(); - } - } - - /** - * Falcon owns this dir on HDFS which no one has permissions to read. - * - * @return FileSystem handle - */ - private FileSystem initializeFileSystem() { - try { - FileSystem fileSystem = - HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); - if (!fileSystem.exists(storePath)) { - LOG.info("Creating configuration store directory: {}", storePath); - // set permissions so config store dir is owned by falcon alone - HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION); - } - - return fileSystem; - } catch (Exception e) { - throw new RuntimeException("Unable to bring up config store for path: " + storePath, e); - } - } - - @Override - public void init() throws FalconException { - String listenerClassNames = StartupProperties.get(). - getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph"); - for (String listenerClassName : listenerClassNames.split(",")) { - listenerClassName = listenerClassName.trim(); - if (listenerClassName.isEmpty()) { - continue; - } - ConfigurationChangeListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); - registerListener(listener); - } - - if (shouldPersist) { - for (final EntityType type : ENTITY_LOAD_ORDER) { - loadEntity(type); - } - } - } - - private void loadEntity(final EntityType type) throws FalconException { - try { - final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); - FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); - if (files != null) { - final ExecutorService service = Executors.newFixedThreadPool(100); - for (final FileStatus file : files) { - service.execute(new Runnable() { - @Override - public void run() { - try { - String fileName = file.getPath().getName(); - String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop - // ".xml" - String entityName = URLDecoder.decode(encodedEntityName, UTF_8); - Entity entity = restore(type, entityName); - entityMap.put(entityName, entity); - } catch (IOException | FalconException e) { - LOG.error("Unable to restore entity of", file); - } - } - }); - } - service.shutdown(); - if (service.awaitTermination(10, TimeUnit.MINUTES)) { - LOG.info("Restored Configurations for entity type: {} ", type.name()); - } else { - LOG.warn("Time out happened while waiting for all threads to finish while restoring entities " - + "for type: {}", type.name()); - } - // Checking if all entities were loaded - if (entityMap.size() != files.length) { - throw new FalconException("Unable to restore configurations for entity type " + type.name()); - } - for (Entity entity : entityMap.values()){ - onReload(entity); - } - } - } catch (IOException e) { - throw new FalconException("Unable to restore configurations", e); - } catch (InterruptedException e) { - throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name()); - } - } - - public void registerListener(ConfigurationChangeListener listener) { - listeners.add(listener); - } - - public void unregisterListener(ConfigurationChangeListener listener) { - listeners.remove(listener); - } - - /** - * @param type - EntityType that need to be published - * @param entity - Reference to the Entity Object - * @throws FalconException - */ - public synchronized void publish(EntityType type, Entity entity) throws FalconException { - try { - if (get(type, entity.getName()) == null) { - persist(type, entity); - onAdd(entity); - dictionary.get(type).put(entity.getName(), entity); - } else { - throw new EntityAlreadyExistsException( - entity.toShortString() + " already registered with configuration store. " - + "Can't be submitted again. Try removing before submitting."); - } - } catch (IOException e) { - throw new StoreAccessException(e); - } - AUDIT.info(type + "/" + entity.getName() + " is published into config store"); - } - - private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException { - try { - if (get(type, entity.getName()) != null) { - persist(type, entity); - ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); - Entity oldEntity = entityMap.get(entity.getName()); - onChange(oldEntity, entity); - entityMap.put(entity.getName(), entity); - } else { - throw new FalconException(entity.toShortString() + " doesn't exist"); - } - } catch (IOException e) { - throw new StoreAccessException(e); - } - AUDIT.info(type + "/" + entity.getName() + " is replaced into config store"); - } - - public synchronized void update(EntityType type, Entity entity) throws FalconException { - if (updatesInProgress.get() == entity) { - updateInternal(type, entity); - } else { - throw new FalconException(entity.toShortString() + " is not initialized for update"); - } - } - - private void onAdd(Entity entity) throws FalconException { - for (ConfigurationChangeListener listener : listeners) { - listener.onAdd(entity); - } - } - - private void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - for (ConfigurationChangeListener listener : listeners) { - listener.onChange(oldEntity, newEntity); - } - } - - private void onReload(Entity entity) throws FalconException { - for (ConfigurationChangeListener listener : listeners) { - listener.onReload(entity); - } - } - - public synchronized void initiateUpdate(Entity entity) throws FalconException { - if (get(entity.getEntityType(), entity.getName()) == null || updatesInProgress.get() != null) { - throw new FalconException( - "An update for " + entity.toShortString() + " is already in progress or doesn't exist"); - } - updatesInProgress.set(entity); - } - - /** - * @param type - Entity type that is being retrieved - * @param name - Name as it appears in the entity xml definition - * @param <T> - Actual Entity object type - * @return - Entity object from internal dictionary, If the object is not - * loaded in memory yet, it will retrieve it from persistent store - * just in time. On startup all the entities will be added to the - * dictionary with null reference. - * @throws FalconException - */ - @SuppressWarnings("unchecked") - public <T extends Entity> T get(EntityType type, String name) throws FalconException { - ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); - if (entityMap.containsKey(name)) { - if (updatesInProgress.get() != null && updatesInProgress.get().getEntityType() == type - && updatesInProgress.get().getName().equals(name)) { - return (T) updatesInProgress.get(); - } - T entity = (T) entityMap.get(name); - if (entity == NULL && shouldPersist) { // Object equality being checked - try { - entity = this.restore(type, name); - } catch (IOException e) { - throw new StoreAccessException(e); - } - entityMap.put(name, entity); - return entity; - } else { - return entity; - } - } else { - return null; - } - } - - public Collection<String> getEntities(EntityType type) { - return Collections.unmodifiableCollection(dictionary.get(type).keySet()); - } - - /** - * Remove an entity which is already stored in the config store. - * - * @param type - Entity type being removed - * @param name - Name of the entity object being removed - * @return - True is remove is successful, false if request entity doesn't - * exist - * @throws FalconException - */ - public synchronized boolean remove(EntityType type, String name) throws FalconException { - Map<String, Entity> entityMap = dictionary.get(type); - if (entityMap.containsKey(name)) { - try { - archive(type, name); - Entity entity = entityMap.get(name); - onRemove(entity); - entityMap.remove(name); - } catch (IOException e) { - throw new StoreAccessException(e); - } - AUDIT.info(type + " " + name + " is removed from config store"); - return true; - } - return false; - } - - private void onRemove(Entity entity) throws FalconException { - for (ConfigurationChangeListener listener : listeners) { - listener.onRemove(entity); - } - } - - /** - * @param type - Entity type that is to be stored into persistent storage - * @param entity - entity to persist. JAXB Annotated entity will be marshalled - * to the persistent store. The convention used for storing the - * object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml - * @throws java.io.IOException If any error in accessing the storage - * @throws FalconException - */ - private void persist(EntityType type, Entity entity) throws IOException, FalconException { - if (!shouldPersist) { - return; - } - OutputStream out = fs - .create(new Path(storePath, - type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml")); - try { - type.getMarshaller().marshal(entity, out); - LOG.info("Persisted configuration {}/{}", type, entity.getName()); - } catch (JAXBException e) { - LOG.error("Unable to serialize the entity object {}/{}", type, entity.getName(), e); - throw new StoreAccessException("Unable to serialize the entity object " + type + "/" + entity.getName(), e); - } finally { - out.close(); - } - } - - /** - * Archive removed configuration in the persistent store. - * - * @param type - Entity type to archive - * @param name - name - * @throws IOException If any error in accessing the storage - */ - private void archive(EntityType type, String name) throws IOException { - if (!shouldPersist) { - return; - } - Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type); - HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION); - fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"), - new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis())); - LOG.info("Archived configuration {}/{}", type, name); - } - - /** - * @param type - Entity type to restore from persistent store - * @param name - Name of the entity to restore. - * @param <T> - Actual entity object type - * @return - De-serialized entity object restored from persistent store - * @throws IOException If any error in accessing the storage - * @throws FalconException - */ - @SuppressWarnings("unchecked") - private synchronized <T extends Entity> T restore(EntityType type, String name) - throws IOException, FalconException { - - InputStream in = fs.open(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml")); - try { - return (T) type.getUnmarshaller().unmarshal(in); - } catch (JAXBException e) { - throw new StoreAccessException("Unable to un-marshall xml definition for " + type + "/" + name, e); - } finally { - in.close(); - LOG.info("Restored configuration {}/{}", type, name); - } - } - - public void cleanupUpdateInit() { - updatesInProgress.set(null); - } - - @Override - public String getName() { - return this.getClass().getName(); - } - - @Override - public void destroy() { - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java b/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java deleted file mode 100644 index 28c5ac0..0000000 --- a/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.falcon.FalconException; - -/** - * Exception to thrown when entity being sought for addition is already present in config store. - */ -public class EntityAlreadyExistsException extends FalconException { - - public EntityAlreadyExistsException(Exception e) { - super(e); - } - - public EntityAlreadyExistsException(String message, Exception e) { - super(message, e); - } - - public EntityAlreadyExistsException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java deleted file mode 100644 index a9b7617..0000000 --- a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.resource.FeedLookupResult; -import org.apache.falcon.service.ConfigurationChangeListener; -import org.apache.falcon.util.DeploymentUtil; -import org.apache.falcon.util.FalconRadixUtils; -import org.apache.falcon.util.RadixTree; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.List; - -/** - * A <Key, Value> Store to store FeedProperties against Feed's Locations. - * - * For example: - * let's say a feed - <b>MyFeed</b>, is configured for two clusters - cluster1 and cluster2 and has data location as - * below. - * /projects/myprocess/data/${MONTH}-${DAY}-${HOUR} - * /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR} - * - * then the key,value store will be like below - * key1: /projects/myprocess/data/${MONTH}-${DAY}-${HOUR} - * value1: [FeedProperties("cluster1", LocationType.DATA, "MyFeed"), - * FeedProperties("cluster2", LocationType.DATA, "MyFeed") - * ] - * - * key2: /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR} - * value2: [FeedProperties("cluster1", LocationType.META, "MyFeed"), - * FeedProperties("cluster2", LocationType.META, "MyFeed") - * ] - * - * It ensures that no two Feeds share the same location. - * It can also be used for operations like: - * <ul> - * <li>Find if a there is a feed which uses a given path as it's location.</li> - * <li>Find name of the feed, given it's location.</li> - * </ul> - */ -public final class FeedLocationStore implements ConfigurationChangeListener { - - private static final Logger LOG = LoggerFactory.getLogger(FeedLocationStore.class); - protected final FeedPathStore<FeedLookupResult.FeedProperties> store = new - RadixTree<FeedLookupResult.FeedProperties>(); - - private static FeedLocationStore instance = new FeedLocationStore(); - - private FeedLocationStore(){ - } - - public static FeedLocationStore get(){ - return instance; - } - - @Override - public void onAdd(Entity entity) throws FalconException { - if (entity.getEntityType() == EntityType.FEED){ - Feed feed = (Feed) entity; - List<Cluster> clusters = feed.getClusters().getClusters(); - for(Cluster cluster: clusters) { - if (DeploymentUtil.getCurrentClusters().contains(cluster.getName())) { - List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, - cluster.getName()), feed); - if (clusterSpecificLocations != null) { - for (Location location : clusterSpecificLocations) { - if (location != null && StringUtils.isNotBlank(location.getPath())) { - FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties( - feed.getName(), location.getType(), cluster.getName()); - store.insert(StringUtils.trim(location.getPath()), value); - LOG.debug("Inserted location: {} for feed: {} and cluster: {}", - location.getPath(), feed.getName(), cluster.getName()); - } - } - } - } - } - } - } - - /** - * Delete the key(path) from the store if the feed is deleted. - * @param entity entity object - * @throws FalconException - */ - @Override - public void onRemove(Entity entity) throws FalconException { - if (entity.getEntityType() == EntityType.FEED){ - - Feed feed = (Feed) entity; - List<Cluster> clusters = feed.getClusters().getClusters(); - for(Cluster cluster: clusters){ - List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed, - cluster.getName()), feed); - if (clusterSpecificLocations != null) { - for(Location location: clusterSpecificLocations){ - if (location != null && StringUtils.isNotBlank(location.getPath())){ - FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(), - location.getType(), cluster.getName()); - LOG.debug("Delete called for location: {} for feed: {} and cluster: {}", - location.getPath(), feed.getName(), cluster.getName()); - store.delete(location.getPath(), value); - LOG.debug("Deleted location: {} for feed: {} and cluster: {}", - location.getPath(), feed.getName(), cluster.getName()); - } - } - } - } - } - - } - - /** - * Delete the old path and insert the new Path when the feed is updated. - * @param oldEntity old entity object - * @param newEntity updated entity object - * @throws FalconException if the new path already exists in the store. - */ - @Override - public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - onRemove(oldEntity); - onAdd(newEntity); - } - - @Override - public void onReload(Entity entity) throws FalconException { - onAdd(entity); - } - - - public Collection<FeedLookupResult.FeedProperties> reverseLookup(String path) { - return store.find(path, new FalconRadixUtils.FeedRegexAlgorithm()); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java deleted file mode 100644 index 1be12fe..0000000 --- a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.falcon.util.FalconRadixUtils; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.Collection; - -/** - * A <Key, Value> Store to store values against Feed Locations. - * - * @param <T> - */ -public interface FeedPathStore<T> { - - void insert(@Nullable String key, @Nonnull T value); - - int getSize(); - - @Nullable - Collection<T> find(@Nonnull String key, @Nonnull FalconRadixUtils.INodeAlgorithm algorithm); - - @Nullable - Collection<T> find(@Nonnull String key); - - boolean delete(@Nonnull String key, @Nonnull T value); - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java deleted file mode 100644 index 318dc2e..0000000 --- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.falcon.FalconException; - -/** - * Exception when there in issue accessing the persistent store. - */ -public class StoreAccessException extends FalconException { - - /** - * @param e Exception - */ - public StoreAccessException(String message, Exception e) { - super(message, e); - } - - public StoreAccessException(Exception e) { - super(e); - } -}
