http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java deleted file mode 100644 index b709857..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ /dev/null @@ -1,381 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.Vertex; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.CatalogStorage; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.common.FeedDataPath; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.falcon.workflow.WorkflowExecutionContext; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URISyntaxException; -import java.util.Date; -import java.util.TimeZone; - -/** - * Instance Metadata relationship mapping helper. - */ -public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class); - - private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed - private static final String NONE = "NONE"; - private static final String IGNORE = "IGNORE"; - - // process workflow properties from message - private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = { - WorkflowExecutionArgs.USER_WORKFLOW_NAME, - WorkflowExecutionArgs.USER_WORKFLOW_ENGINE, - WorkflowExecutionArgs.WORKFLOW_ID, - WorkflowExecutionArgs.RUN_ID, - WorkflowExecutionArgs.STATUS, - WorkflowExecutionArgs.WF_ENGINE_URL, - WorkflowExecutionArgs.USER_SUBFLOW_ID, - }; - - - public InstanceRelationshipGraphBuilder(Graph graph, boolean preserveHistory) { - super(graph, preserveHistory); - } - - public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException { - String processInstanceName = getProcessInstanceName(context); - LOG.info("Adding process instance: {}", processInstanceName); - - Vertex processInstance = addVertex(processInstanceName, - RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsLong()); - addWorkflowInstanceProperties(processInstance, context); - - addInstanceToEntity(processInstance, context.getEntityName(), - RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE); - addInstanceToEntity(processInstance, context.getClusterName(), - RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE); - addInstanceToEntity(processInstance, context.getWorkflowUser(), - RelationshipType.USER, RelationshipLabel.USER); - - if (isPreserveHistory()) { - Process process = ConfigurationStore.get().get(EntityType.PROCESS, context.getEntityName()); - addDataClassification(process.getTags(), processInstance); - addPipelines(process.getPipelines(), processInstance); - } - - addCounters(processInstance, context); - - return processInstance; - } - - private void addCounters(Vertex processInstance, WorkflowExecutionContext context) throws FalconException { - String counterString = getCounterString(context); - if (!StringUtils.isBlank(counterString)) { - addCountersToInstance(counterString, processInstance); - } - } - - private String getCounterString(WorkflowExecutionContext context) { - if (!StringUtils.isBlank(context.getCounters())) { - return context.getCounters(); - } - return null; - } - - public String getProcessInstanceName(WorkflowExecutionContext context) { - return context.getEntityName() + "/" + context.getNominalTimeAsISO8601(); - } - - public void addWorkflowInstanceProperties(Vertex processInstance, - WorkflowExecutionContext context) { - for (WorkflowExecutionArgs instanceWorkflowProperty : INSTANCE_WORKFLOW_PROPERTIES) { - addProperty(processInstance, context, instanceWorkflowProperty); - } - - processInstance.setProperty(RelationshipProperty.VERSION.getName(), - context.getUserWorkflowVersion()); - } - - private void addProperty(Vertex vertex, WorkflowExecutionContext context, - WorkflowExecutionArgs optionName) { - String value = context.getValue(optionName); - if (value == null || value.length() == 0) { - return; - } - - vertex.setProperty(optionName.getName(), value); - } - - private void addCountersToInstance(String counterString, Vertex vertex) throws FalconException { - String[] counterKeyValues = counterString.split(","); - try { - for (String counter : counterKeyValues) { - String[] keyVals = counter.split(":", 2); - vertex.setProperty(keyVals[0], Long.parseLong(keyVals[1])); - } - } catch (NumberFormatException e) { - throw new FalconException("Invalid values for counter:" + e); - } - } - - public void addInstanceToEntity(Vertex instanceVertex, String entityName, - RelationshipType entityType, RelationshipLabel edgeLabel) { - addInstanceToEntity(instanceVertex, entityName, entityType, edgeLabel, null); - } - - public void addInstanceToEntity(Vertex instanceVertex, String entityName, - RelationshipType entityType, RelationshipLabel edgeLabel, - String timestamp) { - Vertex entityVertex = findVertex(entityName, entityType); - LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex); - if (entityVertex == null) { - LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName); - throw new IllegalStateException(entityType + " entity vertex must exist " + entityName); - } - - addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp); - } - - public void addOutputFeedInstances(WorkflowExecutionContext context, - Vertex processInstance) throws FalconException { - String outputFeedNamesArg = context.getOutputFeedNames(); - if (NONE.equals(outputFeedNamesArg) || IGNORE.equals(outputFeedNamesArg)) { - return; // there are no output feeds for this process - } - - String[] outputFeedNames = context.getOutputFeedNamesList(); - String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList(); - - for (int index = 0; index < outputFeedNames.length; index++) { - String feedName = outputFeedNames[index]; - String feedInstanceDataPath = outputFeedInstancePaths[index]; - addFeedInstance(processInstance, RelationshipLabel.PROCESS_FEED_EDGE, - context, feedName, feedInstanceDataPath); - } - } - - public void addInputFeedInstances(WorkflowExecutionContext context, - Vertex processInstance) throws FalconException { - String inputFeedNamesArg = context.getInputFeedNames(); - if (NONE.equals(inputFeedNamesArg) || IGNORE.equals(inputFeedNamesArg)) { - return; // there are no input feeds for this process - } - - String[] inputFeedNames = context.getInputFeedNamesList(); - String[] inputFeedInstancePaths = context.getInputFeedInstancePathsList(); - - for (int index = 0; index < inputFeedNames.length; index++) { - String inputFeedName = inputFeedNames[index]; - String inputFeedInstancePath = inputFeedInstancePaths[index]; - // Multiple instance paths for a given feed is separated by "," - String[] feedInstancePaths = inputFeedInstancePath.split(","); - - for (String feedInstanceDataPath : feedInstancePaths) { - addFeedInstance(processInstance, RelationshipLabel.FEED_PROCESS_EDGE, - context, inputFeedName, feedInstanceDataPath); - } - } - } - - public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException { - // For replication there will be only one output feed name and path - String feedName = context.getOutputFeedNames(); - String feedInstanceDataPath = context.getOutputFeedInstancePaths(); - String targetClusterName = context.getClusterName(); - - LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName, - feedInstanceDataPath, targetClusterName); - String feedInstanceName = getFeedInstanceName(feedName, targetClusterName, - feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); - - LOG.info("Vertex exists? name={}, type={}, v={}", - feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); - if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon - LOG.info("{} instance vertex {} does not exist, add it", - RelationshipType.FEED_INSTANCE, feedInstanceName); - feedInstanceVertex = addFeedInstance(// add a new instance - feedInstanceName, context, feedName, context.getSrcClusterName()); - } - - addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY, - RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601()); - - addCounters(feedInstanceVertex, context); - } - - public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException { - final String outputFeedPaths = context.getOutputFeedInstancePaths(); - if (IGNORE.equals(outputFeedPaths)) { - LOG.info("There were no evicted instances, nothing to record"); - return; - } - - LOG.info("Recording lineage for evicted instances {}", outputFeedPaths); - // For retention there will be only one output feed name - String feedName = context.getOutputFeedNames(); - String[] evictedFeedInstancePathList = context.getOutputFeedInstancePathsList(); - String clusterName = context.getClusterName(); - - for (String evictedFeedInstancePath : evictedFeedInstancePathList) { - LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}", - feedName, evictedFeedInstancePath, clusterName); - String feedInstanceName = getFeedInstanceName(feedName, clusterName, - evictedFeedInstancePath, context.getNominalTimeAsISO8601()); - Vertex feedInstanceVertex = findVertex(feedInstanceName, - RelationshipType.FEED_INSTANCE); - - LOG.info("Vertex exists? name={}, type={}, v={}", - feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); - if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon - LOG.info("{} instance vertex {} does not exist, add it", - RelationshipType.FEED_INSTANCE, feedInstanceName); - feedInstanceVertex = addFeedInstance(// add a new instance - feedInstanceName, context, feedName, clusterName); - } - - addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY, - RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601()); - } - } - - - public void addImportedInstance(WorkflowExecutionContext context) throws FalconException { - - String feedName = context.getOutputFeedNames(); - String feedInstanceDataPath = context.getOutputFeedInstancePaths(); - String datasourceName = context.getDatasourceName(); - String sourceClusterName = context.getSrcClusterName(); - - LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} " - + "from datasource: {}", feedName, - feedInstanceDataPath, sourceClusterName, datasourceName); - String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName, - feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); - - LOG.info("Vertex exists? name={}, type={}, v={}", - feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); - if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon - LOG.info("{} instance vertex {} does not exist, add it", - RelationshipType.FEED_INSTANCE, feedInstanceName); - feedInstanceVertex = addFeedInstance(// add a new instance - feedInstanceName, context, feedName, context.getSrcClusterName()); - } - addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY, - RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601()); - addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY, - RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601()); - } - - public String getImportInstanceName(WorkflowExecutionContext context) { - return context.getEntityName() + "/" + context.getNominalTimeAsISO8601(); - } - - private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel, - WorkflowExecutionContext context, String feedName, - String feedInstanceDataPath) throws FalconException { - String clusterName = context.getClusterName(); - LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName, - feedInstanceDataPath, clusterName); - String feedInstanceName = getFeedInstanceName(feedName, clusterName, - feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName); - addProcessFeedEdge(processInstance, feedInstance, edgeLabel); - } - - private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context, - String feedName, String clusterName) throws FalconException { - LOG.info("Adding feed instance {}", feedInstanceName); - Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE, - context.getTimeStampAsLong()); - - addInstanceToEntity(feedInstance, feedName, - RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE); - addInstanceToEntity(feedInstance, clusterName, - RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE); - addInstanceToEntity(feedInstance, context.getWorkflowUser(), - RelationshipType.USER, RelationshipLabel.USER); - - if (isPreserveHistory()) { - Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName); - addDataClassification(feed.getTags(), feedInstance); - addGroups(feed.getGroups(), feedInstance); - } - - return feedInstance; - } - - public static String getFeedInstanceName(String feedName, String clusterName, - String feedInstancePath, - String nominalTime) throws FalconException { - try { - Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName); - Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName); - - Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster); - return storageType == Storage.TYPE.TABLE - ? getTableFeedInstanceName(feed, feedInstancePath, storageType) - : getFileSystemFeedInstanceName(feedInstancePath, feed, cluster, nominalTime); - - } catch (URISyntaxException e) { - throw new FalconException(e); - } - } - - private static String getTableFeedInstanceName(Feed feed, String feedInstancePath, - Storage.TYPE storageType) throws URISyntaxException { - CatalogStorage instanceStorage = (CatalogStorage) FeedHelper.createStorage( - storageType.name(), feedInstancePath); - return feed.getName() + "/" + instanceStorage.toPartitionAsPath(); - } - - private static String getFileSystemFeedInstanceName(String feedInstancePath, Feed feed, - Cluster cluster, - String nominalTime) throws FalconException { - Storage rawStorage = FeedHelper.createStorage(cluster, feed); - String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA); - String instance = feedInstancePath; - - String[] elements = FeedDataPath.PATTERN.split(feedPathTemplate); - for (String element : elements) { - instance = instance.replaceFirst(element, ""); - } - - Date instanceTime = FeedHelper.getDate(feedPathTemplate, - new Path(feedInstancePath), TimeZone.getTimeZone("UTC")); - - return StringUtils.isEmpty(instance) - ? feed.getName() + "/" + nominalTime - : feed.getName() + "/" - + SchemaHelper.formatDateUTC(instanceTime); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java deleted file mode 100644 index cf2b651..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ /dev/null @@ -1,338 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.GraphFactory; -import com.tinkerpop.blueprints.KeyIndexableGraph; -import com.tinkerpop.blueprints.TransactionalGraph; -import com.tinkerpop.blueprints.Vertex; -import com.tinkerpop.blueprints.util.TransactionRetryHelper; -import com.tinkerpop.blueprints.util.TransactionWork; -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.service.ConfigurationChangeListener; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.service.Services; -import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.workflow.WorkflowJobEndNotificationService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.falcon.workflow.WorkflowExecutionContext; -import org.apache.falcon.workflow.WorkflowExecutionListener; - -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -/** - * Metadata relationship mapping service. Maps relationships into a graph database. - */ -public class MetadataMappingService - implements FalconService, ConfigurationChangeListener, WorkflowExecutionListener { - - private static final Logger LOG = LoggerFactory.getLogger(MetadataMappingService.class); - - /** - * Constance for the service name. - */ - public static final String SERVICE_NAME = MetadataMappingService.class.getSimpleName(); - - /** - * Constant for the configuration property that indicates the prefix. - */ - private static final String FALCON_PREFIX = "falcon.graph."; - - - private Graph graph; - private Set<String> vertexIndexedKeys; - private Set<String> edgeIndexedKeys; - private EntityRelationshipGraphBuilder entityGraphBuilder; - private InstanceRelationshipGraphBuilder instanceGraphBuilder; - - private int transactionRetries; - private long transactionRetryDelayInMillis; - - @Override - public String getName() { - return SERVICE_NAME; - } - - @Override - public void init() throws FalconException { - graph = initializeGraphDB(); - createIndicesForVertexKeys(); - // todo - create Edge Cardinality Constraints - LOG.info("Initialized graph db: {}", graph); - - vertexIndexedKeys = getIndexableGraph().getIndexedKeys(Vertex.class); - LOG.info("Init vertex property keys: {}", vertexIndexedKeys); - - edgeIndexedKeys = getIndexableGraph().getIndexedKeys(Edge.class); - LOG.info("Init edge property keys: {}", edgeIndexedKeys); - - boolean preserveHistory = Boolean.valueOf(StartupProperties.get().getProperty( - "falcon.graph.preserve.history", "false")); - entityGraphBuilder = new EntityRelationshipGraphBuilder(graph, preserveHistory); - instanceGraphBuilder = new InstanceRelationshipGraphBuilder(graph, preserveHistory); - - ConfigurationStore.get().registerListener(this); - Services.get().<WorkflowJobEndNotificationService>getService( - WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this); - try { - transactionRetries = Integer.parseInt(StartupProperties.get().getProperty( - "falcon.graph.transaction.retry.count", "3")); - transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty( - "falcon.graph.transaction.retry.delay", "5")); - } catch (NumberFormatException e) { - throw new FalconException("Invalid values for graph transaction retry delay/count " + e); - } - } - - protected Graph initializeGraphDB() { - LOG.info("Initializing graph db"); - - Configuration graphConfig = getConfiguration(); - return GraphFactory.open(graphConfig); - } - - public static Configuration getConfiguration() { - Configuration graphConfig = new BaseConfiguration(); - - Properties configProperties = StartupProperties.get(); - for (Map.Entry entry : configProperties.entrySet()) { - String name = (String) entry.getKey(); - if (name.startsWith(FALCON_PREFIX)) { - String value = (String) entry.getValue(); - name = name.substring(FALCON_PREFIX.length()); - graphConfig.setProperty(name, value); - } - } - - return graphConfig; - } - - /** - * This unfortunately requires a handle to Titan implementation since - * com.tinkerpop.blueprints.KeyIndexableGraph#createKeyIndex does not create an index. - */ - protected void createIndicesForVertexKeys() { - if (!((KeyIndexableGraph) graph).getIndexedKeys(Vertex.class).isEmpty()) { - LOG.info("Indexes already exist for graph"); - return; - } - - LOG.info("Indexes does not exist, Creating indexes for graph"); - // todo - externalize this - makeNameKeyIndex(); - makeKeyIndex(RelationshipProperty.TYPE.getName()); - makeKeyIndex(RelationshipProperty.TIMESTAMP.getName()); - makeKeyIndex(RelationshipProperty.VERSION.getName()); - } - - private void makeNameKeyIndex() { - getTitanGraph().makeKey(RelationshipProperty.NAME.getName()) - .dataType(String.class) - .indexed(Vertex.class) - .indexed(Edge.class) - // .unique() todo this ought to be unique? - .make(); - getTitanGraph().commit(); - } - - private void makeKeyIndex(String key) { - getTitanGraph().makeKey(key) - .dataType(String.class) - .indexed(Vertex.class) - .make(); - getTitanGraph().commit(); - } - - public Graph getGraph() { - return graph; - } - - public KeyIndexableGraph getIndexableGraph() { - return (KeyIndexableGraph) graph; - } - - public TransactionalGraph getTransactionalGraph() { - return (TransactionalGraph) graph; - } - - public TitanBlueprintsGraph getTitanGraph() { - return (TitanBlueprintsGraph) graph; - } - - public Set<String> getVertexIndexedKeys() { - return vertexIndexedKeys; - } - - public Set<String> getEdgeIndexedKeys() { - return edgeIndexedKeys; - } - - @Override - public void destroy() throws FalconException { - Services.get().<WorkflowJobEndNotificationService>getService( - WorkflowJobEndNotificationService.SERVICE_NAME).unregisterListener(this); - - LOG.info("Shutting down graph db"); - graph.shutdown(); - } - - @Override - public void onAdd(final Entity entity) throws FalconException { - EntityType entityType = entity.getEntityType(); - LOG.info("Adding lineage for entity: {}, type: {}", entity.getName(), entityType); - try { - new TransactionRetryHelper.Builder<Void>(getTransactionalGraph()) - .perform(new TransactionWork<Void>() { - @Override - public Void execute(TransactionalGraph transactionalGraph) throws Exception { - entityGraphBuilder.addEntity(entity); - transactionalGraph.commit(); - return null; - } - }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis); - - } catch (Exception e) { - getTransactionalGraph().rollback(); - throw new FalconException(e); - } - } - - @Override - public void onRemove(Entity entity) throws FalconException { - // do nothing, we'd leave the deleted entities as-is for historical purposes - // should we mark 'em as deleted? - } - - @Override - public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException { - EntityType entityType = newEntity.getEntityType(); - LOG.info("Updating lineage for entity: {}, type: {}", newEntity.getName(), entityType); - try { - new TransactionRetryHelper.Builder<Void>(getTransactionalGraph()) - .perform(new TransactionWork<Void>() { - @Override - public Void execute(TransactionalGraph transactionalGraph) throws Exception { - entityGraphBuilder.updateEntity(oldEntity, newEntity); - transactionalGraph.commit(); - return null; - } - }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis); - - } catch (Exception e) { - getTransactionalGraph().rollback(); - throw new FalconException(e); - } - } - - @Override - public void onReload(Entity entity) throws FalconException { - onAdd(entity); - } - - @Override - public void onSuccess(final WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding lineage for context {}", context); - try { - new TransactionRetryHelper.Builder<Void>(getTransactionalGraph()) - .perform(new TransactionWork<Void>() { - @Override - public Void execute(TransactionalGraph transactionalGraph) throws Exception { - onSuccessfulExecution(context); - transactionalGraph.commit(); - return null; - } - }).build().exponentialBackoff(transactionRetries, transactionRetryDelayInMillis); - } catch (Exception e) { - getTransactionalGraph().rollback(); - throw new FalconException(e); - } - } - - private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException { - WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation(); - switch (entityOperation) { - case GENERATE: - onProcessInstanceExecuted(context); - break; - case REPLICATE: - onFeedInstanceReplicated(context); - break; - case DELETE: - onFeedInstanceEvicted(context); - break; - case IMPORT: - onFeedInstanceImported(context); - break; - default: - throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation); - } - } - - @Override - public void onFailure(WorkflowExecutionContext context) throws FalconException { - // do nothing since lineage is only recorded for successful workflow - } - - @Override - public void onStart(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } - - @Override - public void onSuspend(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } - - @Override - public void onWait(WorkflowExecutionContext context) throws FalconException { - // TBD - } - - - private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException { - Vertex processInstance = instanceGraphBuilder.addProcessInstance(context); - instanceGraphBuilder.addOutputFeedInstances(context, processInstance); - instanceGraphBuilder.addInputFeedInstances(context, processInstance); - } - - private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601()); - instanceGraphBuilder.addReplicatedInstance(context); - } - - private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601()); - instanceGraphBuilder.addEvictedInstance(context); - } - private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601()); - instanceGraphBuilder.addImportedInstance(context); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java deleted file mode 100644 index 0c3fcee..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.GraphQuery; -import com.tinkerpop.blueprints.Vertex; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.security.CurrentUser; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Iterator; - -/** - * Base class for Metadata relationship mapping helper. - */ -public abstract class RelationshipGraphBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(RelationshipGraphBuilder.class); - - /** - * A blueprints graph. - */ - private final Graph graph; - - /** - * If enabled, preserves history of tags and groups for instances else will only - * be available for entities. - */ - private final boolean preserveHistory; - - protected RelationshipGraphBuilder(Graph graph, boolean preserveHistory) { - this.graph = graph; - this.preserveHistory = preserveHistory; - } - - public Graph getGraph() { - return graph; - } - - protected boolean isPreserveHistory() { - return preserveHistory; - } - - public Vertex addVertex(String name, RelationshipType type) { - Vertex vertex = findVertex(name, type); - if (vertex != null) { - LOG.debug("Found an existing vertex for: name={}, type={}", name, type); - return vertex; - } - - return createVertex(name, type); - } - - protected Vertex addVertex(String name, RelationshipType type, long timestamp) { - Vertex vertex = findVertex(name, type); - if (vertex != null) { - LOG.debug("Found an existing vertex for: name={}, type={}", name, type); - return vertex; - } - - return createVertex(name, type, timestamp); - } - - protected Vertex findVertex(String name, RelationshipType type) { - LOG.debug("Finding vertex for: name={}, type={}", name, type); - - GraphQuery query = graph.query() - .has(RelationshipProperty.NAME.getName(), name) - .has(RelationshipProperty.TYPE.getName(), type.getName()); - Iterator<Vertex> results = query.vertices().iterator(); - return results.hasNext() ? results.next() : null; // returning one since name is unique - } - - protected Vertex createVertex(String name, RelationshipType type) { - return createVertex(name, type, System.currentTimeMillis()); - } - - protected Vertex createVertex(String name, RelationshipType type, long timestamp) { - LOG.debug("Creating a new vertex for: name={}, type={}", name, type); - - Vertex vertex = graph.addVertex(null); - vertex.setProperty(RelationshipProperty.NAME.getName(), name); - vertex.setProperty(RelationshipProperty.TYPE.getName(), type.getName()); - vertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp); - - return vertex; - } - - protected Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { - return addEdge(fromVertex, toVertex, edgeLabel, null); - } - - protected Edge addEdge(Vertex fromVertex, Vertex toVertex, - String edgeLabel, String timestamp) { - Edge edge = findEdge(fromVertex, toVertex, edgeLabel); - - Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex); - if (timestamp != null) { - edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp); - } - - return edgeToVertex; - } - - protected void removeEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { - Edge edge = findEdge(fromVertex, toVertex, edgeLabel); - if (edge != null) { - getGraph().removeEdge(edge); - } - } - - protected void removeEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) { - Edge edge = findEdge(fromVertex, toVertexName, edgeLabel); - if (edge != null) { - getGraph().removeEdge(edge); - } - } - - protected Edge findEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { - return findEdge(fromVertex, toVertex.getProperty(RelationshipProperty.NAME.getName()), edgeLabel); - } - - protected Edge findEdge(Vertex fromVertex, Object toVertexName, String edgeLabel) { - Edge edgeToFind = null; - for (Edge edge : fromVertex.getEdges(Direction.OUT, edgeLabel)) { - if (edge.getVertex(Direction.IN).getProperty(RelationshipProperty.NAME.getName()).equals(toVertexName)) { - edgeToFind = edge; - break; - } - } - - return edgeToFind; - } - - protected void addUserRelation(Vertex fromVertex) { - addUserRelation(fromVertex, RelationshipLabel.USER.getName()); - } - - protected void addUserRelation(Vertex fromVertex, String edgeLabel) { - Vertex relationToUserVertex = addVertex(CurrentUser.getUser(), RelationshipType.USER); - addEdge(fromVertex, relationToUserVertex, edgeLabel); - } - - protected void addDataClassification(String classification, Vertex entityVertex) { - if (classification == null || classification.length() == 0) { - return; - } - - String[] tags = classification.split(","); - for (String tag : tags) { - int index = tag.indexOf("="); - String tagKey = tag.substring(0, index); - String tagValue = tag.substring(index + 1, tag.length()); - - Vertex tagValueVertex = addVertex(tagValue, RelationshipType.TAGS); - addEdge(entityVertex, tagValueVertex, tagKey); - } - } - - protected void addGroups(String groups, Vertex fromVertex) { - addCSVTags(groups, fromVertex, RelationshipType.GROUPS, RelationshipLabel.GROUPS); - } - - protected void addPipelines(String pipelines, Vertex fromVertex) { - addCSVTags(pipelines, fromVertex, RelationshipType.PIPELINES, RelationshipLabel.PIPELINES); - } - - protected void addProcessFeedEdge(Vertex processVertex, Vertex feedVertex, - RelationshipLabel edgeLabel) { - if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) { - addEdge(feedVertex, processVertex, edgeLabel.getName()); - } else { - addEdge(processVertex, feedVertex, edgeLabel.getName()); - } - } - - protected String getCurrentTimeStamp() { - return SchemaHelper.formatDateUTC(new Date()); - } - - /** - * Adds comma separated values as tags. - * - * @param csvTags comma separated values. - * @param fromVertex from vertex. - * @param relationshipType vertex type. - * @param edgeLabel edge label. - */ - private void addCSVTags(String csvTags, Vertex fromVertex, - RelationshipType relationshipType, RelationshipLabel edgeLabel) { - if (StringUtils.isEmpty(csvTags)) { - return; - } - - String[] tags = csvTags.split(","); - for (String tag : tags) { - Vertex vertex = addVertex(tag, relationshipType); - addEdge(fromVertex, vertex, edgeLabel.getName()); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java deleted file mode 100644 index 6d4bf46..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -/** - * Enumerates Relationship edge labels. - */ -public enum RelationshipLabel { - - // entity edge labels - FEED_CLUSTER_EDGE("stored-in"), - PROCESS_CLUSTER_EDGE("runs-on"), - FEED_PROCESS_EDGE("input"), - PROCESS_FEED_EDGE("output"), - DATASOURCE_IMPORT_EDGE("import"), - - // instance edge labels - INSTANCE_ENTITY_EDGE("instance-of"), - - // edge labels - CLUSTER_COLO("collocated"), - USER("owned-by"), - GROUPS("grouped-as"), - PIPELINES("pipeline"), - - // replication labels - FEED_CLUSTER_REPLICATED_EDGE("replicated-to"), - - // eviction labels - FEED_CLUSTER_EVICTED_EDGE("evicted-from"); - - private final String name; - - RelationshipLabel(String name) { - this.name = name; - } - - public String getName() { - return name; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java deleted file mode 100644 index ff437d9..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -/** - * Enumerates Relationship property keys. - */ -public enum RelationshipProperty { - - // vertex property keys - indexed - NAME("name"), - TYPE("type"), - TIMESTAMP("timestamp"), - VERSION("version"), - - // workflow properties - USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"), - USER_WORKFLOW_NAME("userWorkflowName", "user workflow name"), - USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version"), - - // workflow instance properties - WORKFLOW_ID("workflowId", "current workflow-id of the instance"), - RUN_ID("runId", "current run-id of the instance"), - STATUS("status", "status of the user workflow instance"), - WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex: oozie"), - USER_SUBFLOW_ID("subflowId", "external id of user workflow"); - - - private final String name; - private final String description; - - RelationshipProperty(String name) { - this(name, name); - } - - RelationshipProperty(String name, String description) { - this.name = name; - this.description = description; - } - - public String getName() { - return name; - } - - public String getDescription() { - return description; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java deleted file mode 100644 index b4d46c4..0000000 --- a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.retention; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -/** - * Utility class for serializing and deserializing the evicted instance paths. - */ - -public final class EvictedInstanceSerDe { - - private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class); - - public static final String INSTANCEPATH_PREFIX = "instancePaths="; - public static final String INSTANCEPATH_SEPARATOR = ","; - - - private EvictedInstanceSerDe() {} - - /** - * This method serializes the evicted instances to a file in logs dir for a given feed. - * @see org.apache.falcon.retention.FeedEvictor - * - * *Note:* This is executed with in the map task for evictor action - * - * @param fileSystem file system handle - * @param logFilePath File path to serialize the instances to - * @param instances list of instances, comma separated - * @throws IOException - */ - public static void serializeEvictedInstancePaths(final FileSystem fileSystem, - final Path logFilePath, - StringBuffer instances) throws IOException { - LOG.info("Writing deleted instances {} to path {}", instances, logFilePath); - OutputStream out = null; - try { - out = fileSystem.create(logFilePath); - instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix - out.write(instances.toString().getBytes()); - - // To make sure log cleaning service can delete this file - FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); - fileSystem.setPermission(logFilePath, permission); - } finally { - if (out != null) { - out.close(); - } - } - - if (LOG.isDebugEnabled()) { - logEvictedInstancePaths(fileSystem, logFilePath); - } - } - - private static void logEvictedInstancePaths(final FileSystem fs, - final Path outPath) throws IOException { - ByteArrayOutputStream writer = new ByteArrayOutputStream(); - InputStream instance = fs.open(outPath); - IOUtils.copyBytes(instance, writer, 4096, true); - LOG.debug("Instance Paths copied to {}", outPath); - LOG.debug("Written {}", writer); - } - - /** - * This method deserializes the evicted instances from a log file on hdfs. - * @see org.apache.falcon.messaging.JMSMessageProducer - * *Note:* This is executed with in the falcon server - * - * @param fileSystem file system handle - * @param logFile File path to serialize the instances to - * @return list of instances, comma separated - * @throws IOException - */ - public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem, - final Path logFile) throws IOException { - ByteArrayOutputStream writer = new ByteArrayOutputStream(); - InputStream instance = fileSystem.open(logFile); - IOUtils.copyBytes(instance, writer, 4096, true); - String[] instancePaths = writer.toString().split(INSTANCEPATH_PREFIX); - - if (instancePaths.length <= 1) { - LOG.info("Returning 0 instance paths for feed "); - return new String[0]; - } else { - LOG.info("Returning instance paths for feed {}", instancePaths[1]); - return instancePaths[1].split(INSTANCEPATH_SEPARATOR); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java deleted file mode 100644 index 1457b06..0000000 --- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.retention; - -import org.apache.commons.el.ExpressionEvaluatorImpl; -import org.apache.falcon.Pair; -import org.apache.falcon.expression.ExpressionHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.servlet.jsp.el.ELException; -import javax.servlet.jsp.el.ExpressionEvaluator; -import java.util.Date; - -/** - * Utilities for feed eviction. - */ -public final class EvictionHelper { - - private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class); - - private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl(); - private static final ExpressionHelper RESOLVER = ExpressionHelper.get(); - - private EvictionHelper(){} - - public static Pair<Date, Date> getDateRange(String period) throws ELException { - Long duration = (Long) EVALUATOR.evaluate("${" + period + "}", - Long.class, RESOLVER, RESOLVER); - Date end = new Date(); - Date start = new Date(end.getTime() - duration); - return Pair.of(start, end); - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java b/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java deleted file mode 100644 index f7b2155..0000000 --- a/common/src/main/java/org/apache/falcon/security/AuthenticationInitializationService.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang.Validate; -import org.apache.falcon.FalconException; -import org.apache.falcon.aspect.GenericAlert; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Date; -import java.util.Properties; -import java.util.Timer; -import java.util.TimerTask; - - -/** - * Authentication Service at startup that initializes the authentication credentials - * based on authentication type. If Kerberos is enabled, it logs in the user with the key tab. - */ -public class AuthenticationInitializationService implements FalconService { - - private static final Logger LOG = LoggerFactory.getLogger(AuthenticationInitializationService.class); - - /** - * Constant for the configuration property that indicates the prefix. - */ - protected static final String CONFIG_PREFIX = "falcon.service.authentication."; - - /** - * Constant for the configuration property that indicates the keytab file path. - */ - protected static final String KERBEROS_KEYTAB = CONFIG_PREFIX + KerberosAuthenticationHandler.KEYTAB; - - /** - * Constant for the configuration property that indicates the kerberos principal. - */ - protected static final String KERBEROS_PRINCIPAL = CONFIG_PREFIX + KerberosAuthenticationHandler.PRINCIPAL; - - /** - * Constant for the configuration property that indicates the authentication token validity time in seconds. - */ - protected static final String AUTH_TOKEN_VALIDITY_SECONDS = CONFIG_PREFIX + "token.validity"; - - private Timer timer = new Timer(); - private static final String SERVICE_NAME = "Authentication initialization service"; - - @Override - public String getName() { - return SERVICE_NAME; - } - - @Override - public void init() throws FalconException { - - if (SecurityUtil.isSecurityEnabled()) { - LOG.info("Falcon Kerberos Authentication Enabled!"); - initializeKerberos(); - - String authTokenValidity = StartupProperties.get().getProperty(AUTH_TOKEN_VALIDITY_SECONDS); - long validateFrequency; - try { - validateFrequency = (StringUtils.isNotEmpty(authTokenValidity)) - ? Long.parseLong(authTokenValidity) : 86400; - } catch (NumberFormatException nfe) { - throw new FalconException("Invalid value provided for startup property \"" - + AUTH_TOKEN_VALIDITY_SECONDS + "\", please provide a valid long number", nfe); - } - timer.schedule(new TokenValidationThread(), 0, validateFrequency*1000); - } else { - LOG.info("Falcon Simple Authentication Enabled!"); - Configuration ugiConf = new Configuration(); - ugiConf.set("hadoop.security.authentication", "simple"); - UserGroupInformation.setConfiguration(ugiConf); - } - } - - protected static void initializeKerberos() throws FalconException { - try { - Properties configuration = StartupProperties.get(); - String principal = configuration.getProperty(KERBEROS_PRINCIPAL); - Validate.notEmpty(principal, - "Missing required configuration property: " + KERBEROS_PRINCIPAL); - principal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal( - principal, SecurityUtil.getLocalHostName()); - - String keytabFilePath = configuration.getProperty(KERBEROS_KEYTAB); - Validate.notEmpty(keytabFilePath, - "Missing required configuration property: " + KERBEROS_KEYTAB); - checkIsReadable(keytabFilePath); - - Configuration conf = new Configuration(); - conf.set("hadoop.security.authentication", "kerberos"); - - UserGroupInformation.setConfiguration(conf); - UserGroupInformation.loginUserFromKeytab(principal, keytabFilePath); - - LOG.info("Got Kerberos ticket, keytab: {}, Falcon principal: {}", keytabFilePath, principal); - } catch (Exception ex) { - throw new FalconException("Could not initialize " + SERVICE_NAME - + ": " + ex.getMessage(), ex); - } - } - - private static void checkIsReadable(String keytabFilePath) { - File keytabFile = new File(keytabFilePath); - if (!keytabFile.exists()) { - throw new IllegalArgumentException("The keytab file does not exist! " + keytabFilePath); - } - - if (!keytabFile.isFile()) { - throw new IllegalArgumentException("The keytab file cannot be a directory! " + keytabFilePath); - } - - if (!keytabFile.canRead()) { - throw new IllegalArgumentException("The keytab file is not readable! " + keytabFilePath); - } - } - - @Override - public void destroy() throws FalconException { - timer.cancel(); - } - - private static class TokenValidationThread extends TimerTask { - @Override - public void run() { - try { - LOG.info("Validating Auth Token: {}", new Date()); - initializeKerberos(); - } catch (Throwable t) { - LOG.error("Error in Auth Token Validation task: ", t); - GenericAlert.initializeKerberosFailed( - "Exception in Auth Token Validation : ", t); - } - } - } - - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java b/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java deleted file mode 100644 index a6f2564..0000000 --- a/common/src/main/java/org/apache/falcon/security/AuthorizationProvider.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.falcon.entity.EntityNotRegisteredException; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; - -import java.io.IOException; - -/** - * An interface for authorizing user against an entity operation. - */ -public interface AuthorizationProvider { - - /** - * Check if the authenticated user is a super user. - * - * @param authenticatedUGI proxy ugi for the authenticated user - * @return true if sure user, else false - */ - boolean isSuperUser(UserGroupInformation authenticatedUGI); - - /** - * Checks if authenticated user can proxy the entity acl owner. - * - * @param authenticatedUGI proxy ugi for the authenticated user. - * @param aclOwner entity ACL Owner. - * @param aclGroup entity ACL group. - * @throws IOException - */ - boolean shouldProxy(UserGroupInformation authenticatedUGI, - String aclOwner, String aclGroup) throws IOException; - - /** - * Determines if the authenticated user is authorized to execute the action on the resource, - * which is typically a REST resource path. - * Throws an exception if not authorized. - * - * @param resource api resource, admin, entities or instance - * @param action action being authorized on resource and entity if applicable - * @param entityType entity type in question, not for admin resource - * @param entityName entity name in question, not for admin resource - * @param authenticatedUGI proxy ugi for the authenticated user - * @throws AuthorizationException - */ - void authorizeResource(String resource, - String action, - String entityType, - String entityName, - UserGroupInformation authenticatedUGI) - throws AuthorizationException, EntityNotRegisteredException; - - /** - * Determines if the authenticated user is authorized to execute the action on the entity. - * Throws an exception if not authorized. - * - * @param entityName entity in question, applicable for entities and instance resource - * @param entityType entity in question, applicable for entities and instance resource - * @param acl entity ACL - * @param action action being authorized on resource and entity if applicable - * @param authenticatedUGI proxy ugi for the authenticated user - * @throws AuthorizationException - */ - void authorizeEntity(String entityName, String entityType, - AccessControlList acl, String action, - UserGroupInformation authenticatedUGI) throws AuthorizationException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java b/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java deleted file mode 100644 index fc4f745..0000000 --- a/common/src/main/java/org/apache/falcon/security/CredentialProviderHelper.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** - * Helper class for Hadoop credential provider functionality. Reflection to used to avoid - * directly referencing the classes and methods so that version dependency is not introduced - * as the Hadoop credential provider is only introduced in 2.6.0 and later. - */ - -public final class CredentialProviderHelper { - - private static final Logger LOG = LoggerFactory.getLogger(CredentialProviderHelper.class); - - private static Class<?> clsCredProvider; - private static Class<?> clsCredProviderFactory; - private static Method methGetPassword; - private static Method methCreateCredEntry; - private static Method methFlush; - private static Method methGetProviders; - - public static final String CREDENTIAL_PROVIDER_PATH = "hadoop.security.credential.provider.path"; - - static { - try { - LOG.debug("Reflecting credential provider classes and methods"); - clsCredProvider = Class.forName("org.apache.hadoop.security.alias.CredentialProvider"); - clsCredProviderFactory = Class.forName("org.apache.hadoop.security.alias.CredentialProviderFactory"); - methCreateCredEntry = clsCredProvider.getMethod("createCredentialEntry", String.class, char[].class); - methFlush = clsCredProvider.getMethod("flush"); - methGetPassword = Configuration.class.getMethod("getPassword", String.class); - methGetProviders = clsCredProviderFactory.getMethod("getProviders", new Class[] { Configuration.class }); - LOG.debug("Found CredentialProviderFactory#getProviders"); - } catch (ClassNotFoundException | NoSuchMethodException cnfe) { - LOG.debug("Ignoring exception", cnfe); - } - } - - private CredentialProviderHelper() { - - } - - public static boolean isProviderAvailable() { - return !(clsCredProvider == null - || clsCredProviderFactory == null - || methCreateCredEntry == null - || methGetPassword == null - || methFlush == null); - } - - public static String resolveAlias(Configuration conf, String alias) throws IOException { - try { - char[] cred = (char[]) methGetPassword.invoke(conf, alias); - if (cred == null) { - throw new IOException("The provided alias cannot be resolved"); - } - return new String(cred); - } catch (InvocationTargetException ite) { - throw new RuntimeException("Error resolving password " - + " from the credential providers ", ite.getTargetException()); - } catch (IllegalAccessException iae) { - throw new RuntimeException("Error invoking the credential provider method", iae); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/security/CurrentUser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java deleted file mode 100644 index e7c1594..0000000 --- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.service.ProxyUserService; -import org.apache.falcon.service.Services; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Current authenticated user via REST. Also captures the proxy user from authorized entity - * and doles out proxied UserGroupInformation. Caches proxied users. - */ -public final class CurrentUser { - - private static final Logger LOG = LoggerFactory.getLogger(CurrentUser.class); - private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); - - private final String authenticatedUser; - private String proxyUser; - - private CurrentUser(String authenticatedUser) { - this.authenticatedUser = authenticatedUser; - this.proxyUser = authenticatedUser; - } - - private static final ThreadLocal<CurrentUser> CURRENT_USER = new ThreadLocal<CurrentUser>(); - - /** - * Captures the authenticated user. - * - * @param user authenticated user - */ - public static void authenticate(final String user) { - if (StringUtils.isEmpty(user)) { - throw new IllegalStateException("Bad user name sent for authentication"); - } - - LOG.info("Logging in {}", user); - CurrentUser currentUser = new CurrentUser(user); - CURRENT_USER.set(currentUser); - } - - /** - * Proxies doAs user. - * - * @param doAsUser doAs user - * @param proxyHost proxy host - * @throws IOException - */ - public static void proxyDoAsUser(final String doAsUser, - final String proxyHost) throws IOException { - if (!isAuthenticated()) { - throw new IllegalStateException("Authentication not done"); - } - - String currentUser = CURRENT_USER.get().authenticatedUser; - if (StringUtils.isNotEmpty(doAsUser) && !doAsUser.equalsIgnoreCase(currentUser)) { - if (StringUtils.isEmpty(proxyHost)) { - throw new IllegalArgumentException("proxy host cannot be null or empty"); - } - ProxyUserService proxyUserService = Services.get().getService(ProxyUserService.SERVICE_NAME); - try { - proxyUserService.validate(currentUser, proxyHost, doAsUser); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - - CurrentUser user = CURRENT_USER.get(); - LOG.info("Authenticated user {} is proxying doAs user {} from host {}", - user.authenticatedUser, doAsUser, proxyHost); - AUDIT.info("Authenticated user {} is proxying doAs user {} from host {}", - user.authenticatedUser, doAsUser, proxyHost); - user.proxyUser = doAsUser; - } - } - - /** - * Captures the entity owner if authenticated user is a super user. - * - * @param aclOwner entity acl owner - * @param aclGroup entity acl group - * @throws IOException - */ - public static void proxy(final String aclOwner, - final String aclGroup) throws IOException { - if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) { - throw new IllegalStateException("Authentication not done or Bad user name"); - } - - CurrentUser user = CURRENT_USER.get(); - LOG.info("Authenticated user {} is proxying entity owner {}/{}", - user.authenticatedUser, aclOwner, aclGroup); - AUDIT.info("Authenticated user {} is proxying entity owner {}/{}", - user.authenticatedUser, aclOwner, aclGroup); - user.proxyUser = aclOwner; - } - - /** - * Clears the context. - */ - public static void clear() { - CURRENT_USER.remove(); - } - - /** - * Checks if the authenticate method is already called. - * - * @return true if authenticated user is set else false - */ - public static boolean isAuthenticated() { - CurrentUser user = CURRENT_USER.get(); - return user != null && user.authenticatedUser != null; - } - - /** - * Returns authenticated user. - * - * @return logged in authenticated user. - */ - public static String getAuthenticatedUser() { - CurrentUser user = CURRENT_USER.get(); - if (user == null || user.authenticatedUser == null) { - throw new IllegalStateException("No user logged into the system"); - } else { - return user.authenticatedUser; - } - } - - /** - * Dole out a UGI object for the current authenticated user if authenticated - * else return current user. - * - * @return UGI object - * @throws java.io.IOException - */ - public static UserGroupInformation getAuthenticatedUGI() throws IOException { - return CurrentUser.isAuthenticated() - ? createProxyUGI(getAuthenticatedUser()) : UserGroupInformation.getCurrentUser(); - } - - /** - * Returns the proxy user. - * - * @return proxy user - */ - public static String getUser() { - CurrentUser user = CURRENT_USER.get(); - if (user == null || user.proxyUser == null) { - throw new IllegalStateException("No user logged into the system"); - } else { - return user.proxyUser; - } - } - - private static ConcurrentMap<String, UserGroupInformation> userUgiMap = - new ConcurrentHashMap<String, UserGroupInformation>(); - - /** - * Create a proxy UGI object for the proxy user. - * - * @param proxyUser logged in user - * @return UGI object - * @throws IOException - */ - public static UserGroupInformation createProxyUGI(String proxyUser) throws IOException { - UserGroupInformation proxyUgi = userUgiMap.get(proxyUser); - if (proxyUgi == null) { - // taking care of a race condition, the latest UGI will be discarded - proxyUgi = UserGroupInformation.createProxyUser( - proxyUser, UserGroupInformation.getLoginUser()); - userUgiMap.putIfAbsent(proxyUser, proxyUgi); - } - - return proxyUgi; - } - - /** - * Dole out a proxy UGI object for the current authenticated user if authenticated - * else return current user. - * - * @return UGI object - * @throws java.io.IOException - */ - public static UserGroupInformation getProxyUGI() throws IOException { - return CurrentUser.isAuthenticated() - ? createProxyUGI(getUser()) : UserGroupInformation.getCurrentUser(); - } - - /** - * Gets a collection of group names the proxy user belongs to. - * - * @return group names - * @throws IOException - */ - public static Set<String> getGroupNames() throws IOException { - HashSet<String> s = new HashSet<String>(Arrays.asList(getProxyUGI().getGroupNames())); - return Collections.unmodifiableSet(s); - } - - /** - * Returns the primary group name for the proxy user. - * - * @return primary group name for the proxy user - */ - public static String getPrimaryGroupName() { - try { - String[] groups = getProxyUGI().getGroupNames(); - if (groups.length > 0) { - return groups[0]; - } - } catch (IOException ignore) { - // ignored - } - - return "unknown"; // this can only happen in tests - } -}
