Repository: falcon Updated Branches: refs/heads/master c462f3e05 -> f7ad3f487
http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java new file mode 100644 index 0000000..0a87213 --- /dev/null +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.engine.oozie.retention; + +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.HiveUtil; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.RetentionStage; +import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils; +import org.apache.falcon.lifecycle.retention.AgeBasedDelete; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.hadoop.fs.Path; + +import java.util.Properties; + +/** + * Workflow Builder for AgeBasedDelete. + */ +public final class AgeBasedWorkflowBuilder { + private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml"; + private static final String EVICTION_ACTION_NAME = "eviction"; + + private AgeBasedWorkflowBuilder(){ + + } + + public static Properties build(Cluster cluster, Path basePath, Feed feed) throws FalconException { + Path buildPath = OozieBuilderUtils.getBuildPath(basePath, LifeCycle.EVICTION.getTag()); + WORKFLOWAPP workflow = new WORKFLOWAPP(); + String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString(); + + //Add eviction action + ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE); + OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME, + OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(eviction); + + //Add post-processing actions + ACTION success = OozieBuilderUtils.getSuccessPostProcessAction(); + OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(success); + + ACTION fail = OozieBuilderUtils.getFailPostProcessAction(); + OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME); + workflow.getDecisionOrForkOrJoin().add(fail); + + OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME); + OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED); + + // Prepare and marshal properties to config-default.xml + Properties props = OozieBuilderUtils.getProperties(buildPath, wfName); + props.putAll(getWorkflowProperties(feed, cluster)); + props.putAll(OozieBuilderUtils.createDefaultConfiguration(cluster, feed, + WorkflowExecutionContext.EntityOperations.DELETE)); + props.putAll(FeedHelper.getUserWorkflowProperties(LifeCycle.EVICTION)); + // override the queueName and priority + RetentionStage retentionStage = FeedHelper.getRetentionStage(feed, cluster.getName()); + props.put(OozieBuilderUtils.MR_QUEUE_NAME, retentionStage.getQueue()); + props.put(OozieBuilderUtils.MR_JOB_PRIORITY, retentionStage.getPriority()); + + if (EntityUtil.isTableStorageType(cluster, feed)) { + setupHiveCredentials(cluster, buildPath, workflow); + // copy paste todo kludge send source hcat creds for coord dependency check to pass + props.putAll(HiveUtil.getHiveCredentials(cluster)); + } + + // Write out the config to config-default.xml + OozieBuilderUtils.marshalDefaultConfig(cluster, workflow, props, buildPath); + + // write out the workflow.xml + OozieBuilderUtils.marshalWokflow(cluster, workflow, buildPath); + return props; + } + + private static Properties getWorkflowProperties(Feed feed, Cluster cluster) throws FalconException { + final Storage storage = FeedHelper.createStorage(cluster, feed); + Properties props = new Properties(); + props.setProperty("srcClusterName", "NA"); + props.setProperty("availabilityFlag", "NA"); + props.put("timeZone", feed.getTimezone().getID()); + props.put("frequency", feed.getFrequency().getTimeUnit().name()); + props.put("falconFeedStorageType", storage.getType().name()); + props.put("limit", new AgeBasedDelete().getRetentionLimit(feed, cluster.getName()).toString()); + props.put("falconInputFeeds", feed.getName()); + props.put("falconInPaths", OozieBuilderUtils.IGNORE); + + String feedDataPath = storage.getUriTemplate(); + props.put("feedDataPath", + feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX)); + + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE); + + return props; + } + + private static void setupHiveCredentials(Cluster cluster, Path wfPath, WORKFLOWAPP workflowApp) + throws FalconException { + if (SecurityUtil.isSecurityEnabled()) { + // add hcatalog credentials for secure mode and add a reference to each action + OozieBuilderUtils.addHCatalogCredentials(workflowApp, cluster, OozieBuilderUtils.HIVE_CREDENTIAL_NAME); + } + + // create hive-site.xml file so actions can use it in the classpath + OozieBuilderUtils.createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance + + for (Object object : workflowApp.getDecisionOrForkOrJoin()) { + if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) { + continue; + } + + org.apache.falcon.oozie.workflow.ACTION action = + (org.apache.falcon.oozie.workflow.ACTION) object; + String actionName = action.getName(); + if (EVICTION_ACTION_NAME.equals(actionName)) { + // add reference to hive-site conf to each action + action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml"); + + if (SecurityUtil.isSecurityEnabled()) { + // add a reference to credential in the action + action.setCred(OozieBuilderUtils.HIVE_CREDENTIAL_NAME); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java new file mode 100644 index 0000000..be9175e --- /dev/null +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.engine.oozie.utils; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.ExternalId; +import org.apache.falcon.entity.HiveUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.oozie.coordinator.CONFIGURATION; +import org.apache.falcon.oozie.coordinator.COORDINATORAPP; +import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.CREDENTIAL; +import org.apache.falcon.oozie.workflow.CREDENTIALS; +import org.apache.falcon.oozie.workflow.END; +import org.apache.falcon.oozie.workflow.KILL; +import org.apache.falcon.oozie.workflow.START; +import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.apache.falcon.workflow.util.OozieConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.client.OozieClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.namespace.QName; +import javax.xml.transform.stream.StreamSource; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringWriter; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Utility class to build oozie artificats. + */ +public final class OozieBuilderUtils { + private static final Logger LOG = LoggerFactory.getLogger(OozieBuilderUtils.class); + + private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml"; + + public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth"; + private static final String USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled"; + public static final String MR_QUEUE_NAME = "queueName"; + public static final String MR_JOB_PRIORITY = "jobPriority"; + private static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}"; + private static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}"; + private static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L; + + private static final JAXBContext WORKFLOW_JAXB_CONTEXT; + private static final JAXBContext ACTION_JAXB_CONTEXT; + private static final JAXBContext COORD_JAXB_CONTEXT; + private static final JAXBContext CONFIG_JAXB_CONTEXT; + + + public static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing"; + public static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing"; + public static final String OK_ACTION_NAME = "end"; + public static final String FAIL_ACTION_NAME = "fail"; + + + public static final String ENTITY_PATH = "ENTITY_PATH"; + public static final String ENTITY_NAME = "ENTITY_NAME"; + public static final String IGNORE = "IGNORE"; + + + static { + try { + WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class); + ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class); + COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class); + CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.CONFIGURATION.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXB context", e); + } + } + + private OozieBuilderUtils() { + + } + + public static ACTION addTransition(ACTION action, String ok, String fail) { + // XTODOS : why return when it is changing the same object? + action.getOk().setTo(ok); + action.getError().setTo(fail); + return action; + } + + + public static void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction) { + wf.setName(name); + wf.setStart(new START()); + wf.getStart().setTo(startAction); + + wf.setEnd(new END()); + wf.getEnd().setName(OK_ACTION_NAME); + + KILL kill = new KILL(); + kill.setName(FAIL_ACTION_NAME); + kill.setMessage("Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]"); + wf.getDecisionOrForkOrJoin().add(kill); + } + + public static ACTION getSuccessPostProcessAction() throws FalconException { + ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE); + decorateWithOozieRetries(action); + return action; + } + + public static ACTION getFailPostProcessAction() throws FalconException { + ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE); + decorateWithOozieRetries(action); + action.setName(FAIL_POSTPROCESS_ACTION_NAME); + return action; + } + + private static Path marshal(Cluster cluster, JAXBElement<?> jaxbElement, + JAXBContext jaxbContext, Path outPath) throws FalconException { + try { + Marshaller marshaller = jaxbContext.createMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); + + if (LOG.isDebugEnabled()) { + StringWriter writer = new StringWriter(); + marshaller.marshal(jaxbElement, writer); + LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName()); + LOG.debug(writer.getBuffer().toString()); + } + + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + outPath.toUri(), ClusterHelper.getConfiguration(cluster)); + OutputStream out = fs.create(outPath); + try { + marshaller.marshal(jaxbElement, out); + } finally { + out.close(); + } + + LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath); + return outPath; + } catch (Exception e) { + throw new FalconException("Unable to marshall app object", e); + } + } + + public static Path marshalCoordinator(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException { + return marshal(cluster, new org.apache.falcon.oozie.coordinator.ObjectFactory().createCoordinatorApp(coord), + COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml")); + } + + + public static Path marshalDefaultConfig(Cluster cluster, WORKFLOWAPP workflowapp, + Properties properties, Path outPath) throws FalconException { + QName workflowQName = new org.apache.falcon.oozie.workflow.ObjectFactory() + .createWorkflowApp(workflowapp).getName(); + org.apache.falcon.oozie.workflow.CONFIGURATION config = getWorkflowConfig(properties); + JAXBElement<org.apache.falcon.oozie.workflow.CONFIGURATION> configJaxbElement = + new JAXBElement(new QName(workflowQName.getNamespaceURI(), "configuration", workflowQName.getPrefix()), + org.apache.falcon.oozie.workflow.CONFIGURATION.class, config); + + Path defaultConfigPath = new Path(outPath, "config-default.xml"); + return marshal(cluster, configJaxbElement, CONFIG_JAXB_CONTEXT, defaultConfigPath); + } + + + public static Path marshalWokflow(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException { + return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow), + WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml")); + } + + public static <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException { + InputStream resourceAsStream = null; + try { + resourceAsStream = OozieBuilderUtils.class.getResourceAsStream(template); + Unmarshaller unmarshaller = context.createUnmarshaller(); + JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls); + return jaxbElement.getValue(); + } catch (JAXBException e) { + throw new FalconException("Failed to unmarshal " + template, e); + } finally { + IOUtils.closeQuietly(resourceAsStream); + } + } + + public static ACTION unmarshalAction(String template) throws FalconException { + return unmarshal(template, ACTION_JAXB_CONTEXT, ACTION.class); + } + + // XTODOS Should we make them more specific to feeds?? + public static void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag, EntityType type) + throws FalconException { + String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( + ClusterHelper.getConfiguration(cluster)); + try { + addExtensionJars(fs, new Path(libext), wf); + addExtensionJars(fs, new Path(libext, type.name()), wf); + if (tag != null) { + addExtensionJars(fs, new Path(libext, type.name() + "/" + tag.name().toLowerCase()), + wf); + } + } catch (IOException e) { + throw new FalconException(e); + } + } + + /** + * + * @param path + * @param name + * @return + */ + public static Properties getProperties(Path path, String name) { + if (path == null) { + return null; + } + Properties prop = new Properties(); + prop.setProperty(ENTITY_PATH, path.toString()); + prop.setProperty(ENTITY_NAME, name); + return prop; + } + + + /** + * Adds path(will be the list of directories containing jars to be added as external jars to workflow e.g. + * for feeds libext, libext/FEED/, libext/FEED/RETENTION, libext/FEED/REPLICATION as an extension jar to the + * workflow. e.g. + * + * @param fs + * @param path + * @param wf + * @throws IOException + */ + public static void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException { + FileStatus[] libs; + try { + libs = fs.listStatus(path); + } catch (FileNotFoundException ignore) { + //Ok if the libext is not configured + return; + } + + for (FileStatus lib : libs) { + if (lib.isDirectory()) { + continue; + } + + for (Object obj : wf.getDecisionOrForkOrJoin()) { + if (!(obj instanceof ACTION)) { + continue; + } + ACTION action = (ACTION) obj; + List<String> files = null; + if (action.getJava() != null) { + files = action.getJava().getFile(); + } else if (action.getPig() != null) { + files = action.getPig().getFile(); + } else if (action.getMapReduce() != null) { + files = action.getMapReduce().getFile(); + } + if (files != null) { + files.add(lib.getPath().toString()); + } + } + } + } + + + public static void decorateWithOozieRetries(ACTION action) { + Properties props = RuntimeProperties.get(); + action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3")); + action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1")); + } + + // creates the default configuration which is written in config-default.xml + public static Properties createDefaultConfiguration(Cluster cluster, Entity entity, + WorkflowExecutionContext.EntityOperations operation) throws FalconException { + Properties props = new Properties(); + props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name()); + props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName()); + props.put("falconDataOperation", operation.name()); + + props.put(WorkflowExecutionArgs.LOG_DIR.getName(), + getStoragePath(EntityUtil.getLogPath(cluster, entity))); + props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster)); + + addLateDataProperties(props, entity); + addBrokerProperties(cluster, props); + + props.put(MR_QUEUE_NAME, "default"); + props.put(MR_JOB_PRIORITY, "NORMAL"); + + //properties provided in entity override the default generated properties + props.putAll(EntityUtil.getEntityProperties(entity)); + props.putAll(createAppProperties(cluster)); + return props; + } + + + // gets the cluster specific properties to be populated in config-default.xml + private static Properties createAppProperties(Cluster cluster) throws FalconException { + Properties properties = EntityUtil.getEntityProperties(cluster); + properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); + properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster)); + properties.setProperty("colo.name", cluster.getColo()); + final String endpoint = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint(); + if (!OozieConstants.LOCAL_OOZIE.equals(endpoint)) { + properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); + } + properties.setProperty("falcon.libpath", + ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib"); + + return properties; + } + + // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster. + public static void createHiveConfiguration(Cluster cluster, Path workflowPath, + String prefix) throws FalconException { + Configuration hiveConf = getHiveCredentialsAsConf(cluster); + + try { + Configuration conf = ClusterHelper.getConfiguration(cluster); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf); + + // create hive conf to stagingDir + Path confPath = new Path(workflowPath + "/conf"); + + persistHiveConfiguration(fs, confPath, hiveConf, prefix); + } catch (IOException e) { + throw new FalconException("Unable to create create hive site", e); + } + } + + private static void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf, + String prefix) throws IOException { + OutputStream out = null; + try { + out = fs.create(new Path(confPath, prefix + "hive-site.xml")); + hiveConf.writeXml(out); + } finally { + IOUtils.closeQuietly(out); + } + } + + /** + * This is only necessary if table is involved and is secure mode. + * + * @param workflowApp workflow xml + * @param cluster cluster entity + */ + public static void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName) { + CREDENTIALS credentials = workflowApp.getCredentials(); + if (credentials == null) { + credentials = new CREDENTIALS(); + } + + credentials.getCredential().add(createHCatalogCredential(cluster, credentialName)); + + // add credential for workflow + workflowApp.setCredentials(credentials); + } + + + /** + * This is only necessary if table is involved and is secure mode. + * + * @param cluster cluster entity + * @param credentialName credential name + * @return CREDENTIALS object + */ + public static CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) { + final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster); + + CREDENTIAL credential = new CREDENTIAL(); + credential.setName(credentialName); + credential.setType("hcat"); + + credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl)); + credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL, + ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL))); + + return credential; + } + + public static CREDENTIAL.Property createProperty(String name, String value) { + CREDENTIAL.Property property = new CREDENTIAL.Property(); + property.setName(name); + property.setValue(value); + return property; + } + + private static Properties getHiveCredentials(Cluster cluster) { + String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster); + if (metaStoreUrl == null) { + throw new IllegalStateException("Registry interface is not defined in cluster: " + cluster.getName()); + } + + Properties hiveCredentials = new Properties(); + hiveCredentials.put(HiveUtil.METASTOREURIS, metaStoreUrl); + hiveCredentials.put(HiveUtil.METASTORE_UGI, "true"); + hiveCredentials.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat")); + hiveCredentials.put(HiveUtil.METASTROE_URI, metaStoreUrl); + + if (SecurityUtil.isSecurityEnabled()) { + String principal = ClusterHelper + .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL); + hiveCredentials.put(SecurityUtil.METASTORE_PRINCIPAL, principal); + hiveCredentials.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal); + hiveCredentials.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true"); + } + + return hiveCredentials; + } + + private static Configuration getHiveCredentialsAsConf(Cluster cluster) { + Properties hiveCredentials = getHiveCredentials(cluster); + + Configuration hiveConf = new Configuration(false); + for (Map.Entry<Object, Object> entry : hiveCredentials.entrySet()) { + hiveConf.set((String)entry.getKey(), (String)entry.getValue()); + } + + return hiveConf; + } + + public static Path getBuildPath(Path buildPath, Tag tag) { + return new Path(buildPath, tag.name()); + } + + protected static String getStoragePath(Path path) { + if (path != null) { + return getStoragePath(path.toString()); + } + return null; + } + + public static String getStoragePath(String path) { + if (StringUtils.isNotEmpty(path)) { + if (new Path(path).toUri().getScheme() == null && !path.startsWith("${nameNode}")) { + path = "${nameNode}" + path; + } + } + return path; + } + + // default configuration for coordinator + public static Properties createCoordDefaultConfiguration(String coordName, Entity entity) + throws FalconException { + + Properties props = new Properties(); + props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL); + props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL); + props.put(OozieClient.EXTERNAL_ID, + new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity), + "${coord:nominalTime()}").getId()); + props.put(USER_JMS_NOTIFICATION_ENABLED, "true"); + //props in entity override the set props. + props.putAll(EntityUtil.getEntityProperties(entity)); + return props; + } + + private static void addLateDataProperties(Properties props, Entity entity) throws FalconException { + if (EntityUtil.getLateProcess(entity) == null + || EntityUtil.getLateProcess(entity).getLateInputs() == null + || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) { + props.put("shouldRecord", "false"); + } else { + props.put("shouldRecord", "true"); + } + } + + private static void addBrokerProperties(Cluster cluster, Properties props) { + props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(), + ClusterHelper.getMessageBrokerUrl(cluster)); + props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), + ClusterHelper.getMessageBrokerImplClass(cluster)); + + String falconBrokerUrl = StartupProperties.get().getProperty( + "broker.url", "tcp://localhost:61616?daemon=true"); + props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl); + + String falconBrokerImplClass = StartupProperties.get().getProperty( + "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); + props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass); + + String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", + DEFAULT_BROKER_MSG_TTL.toString()); + props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL); + } + + + private static org.apache.falcon.oozie.workflow.CONFIGURATION getWorkflowConfig(Properties props) { + org.apache.falcon.oozie.workflow.CONFIGURATION conf = new org.apache.falcon.oozie.workflow.CONFIGURATION(); + for (Map.Entry<Object, Object> prop : props.entrySet()) { + org.apache.falcon.oozie.workflow.CONFIGURATION.Property confProp = + new org.apache.falcon.oozie.workflow.CONFIGURATION.Property(); + confProp.setName((String) prop.getKey()); + confProp.setValue((String) prop.getValue()); + conf.getProperty().add(confProp); + } + return conf; + } + + public static CONFIGURATION getCoordinatorConfig(Properties props) { + CONFIGURATION conf = new CONFIGURATION(); + for (Map.Entry<Object, Object> prop : props.entrySet()) { + CONFIGURATION.Property confProp = new CONFIGURATION.Property(); + confProp.setName((String) prop.getKey()); + confProp.setValue((String) prop.getValue()); + conf.getProperty().add(confProp); + } + return conf; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/resources/action/feed/eviction-action.xml ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml new file mode 100644 index 0000000..4ab67d2 --- /dev/null +++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml @@ -0,0 +1,59 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<action name="eviction" xmlns="uri:oozie:workflow:0.3"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <!-- HCatalog jars --> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>hcatalog</value> + </property> + <property> + <name>oozie.launcher.oozie.libpath</name> + <value>${wf:conf("falcon.libpath")}</value> + </property> + </configuration> + <main-class>org.apache.falcon.retention.FeedEvictor</main-class> + <arg>-feedBasePath</arg> + <arg>${feedDataPath}</arg> + <arg>-falconFeedStorageType</arg> + <arg>${falconFeedStorageType}</arg> + <arg>-retentionType</arg> + <arg>instance</arg> + <arg>-retentionLimit</arg> + <arg>${limit}</arg> + <arg>-timeZone</arg> + <arg>${timeZone}</arg> + <arg>-frequency</arg> + <arg>${frequency}</arg> + <arg>-logFile</arg> + <arg>${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv</arg> + </java> + <ok to="succeeded-post-processing"/> + <error to="failed-post-processing"/> +</action> http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/resources/binding/jaxb-binding.xjb ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/resources/binding/jaxb-binding.xjb b/lifecycle/src/main/resources/binding/jaxb-binding.xjb new file mode 100644 index 0000000..1a43660 --- /dev/null +++ b/lifecycle/src/main/resources/binding/jaxb-binding.xjb @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<jaxb:bindings xmlns:xs="http://www.w3.org/2001/XMLSchema" + xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" version="2.1"> + + <jaxb:bindings schemaLocation="../../../../target/oozie-schemas/oozie-workflow-0.3.xsd" + node="//xs:complexType[@name='ACTION']/xs:sequence/xs:any[@namespace='uri:oozie:sla:0.1']"> + <jaxb:property name="anySLA"/> + </jaxb:bindings> +</jaxb:bindings> http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java b/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java new file mode 100644 index 0000000..cf90f04 --- /dev/null +++ b/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.lifecycle.retention; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LateArrival; +import org.apache.falcon.entity.v0.feed.Lifecycle; +import org.apache.falcon.entity.v0.feed.Properties; +import org.apache.falcon.entity.v0.feed.Property; +import org.apache.falcon.entity.v0.feed.RetentionStage; +import org.apache.falcon.entity.v0.feed.Sla; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests for AgeBasedDelete Policy validations. + */ +public class AgeBasedDeleteTest { + private static Feed feed; + private static String clusterName = "testCluster"; + + @BeforeMethod + private void init() { + feed = new Feed(); + Cluster cluster = new Cluster(); + cluster.setName(clusterName); + + Property property = new Property(); + property.setName(AgeBasedDelete.LIMIT_PROPERTY_NAME); + property.setValue("hours(3)"); + + Properties properties = new Properties(); + properties.getProperties().add(property); + + RetentionStage retentionStage = new RetentionStage(); + retentionStage.setProperties(properties); + + Lifecycle lifecycle = new Lifecycle(); + lifecycle.setRetentionStage(retentionStage); + + cluster.setLifecycle(lifecycle); + + Clusters clusters = new Clusters(); + clusters.getClusters().add(cluster); + feed.setClusters(clusters); + + //set sla + Sla sla = new Sla(); + sla.setSlaLow(new Frequency("hours(3)")); + sla.setSlaHigh(new Frequency("hours(3)")); + feed.setSla(sla); + + // set late data arrival + LateArrival lateArrival = new LateArrival(); + lateArrival.setCutOff(new Frequency("hours(3)")); + feed.setLateArrival(lateArrival); + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = ".*slaHigh of Feed:.*") + public void testSlaValidation() throws FalconException { + feed.getSla().setSlaHigh(new Frequency("hours(4)")); + new AgeBasedDelete().validate(feed, clusterName); + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = ".*Feed's retention limit:.*") + public void testLateDataValidation() throws FalconException { + feed.getLateArrival().setCutOff(new Frequency("hours(4)")); + new AgeBasedDelete().validate(feed, clusterName); + } + + @Test(expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = ".*Invalid value for property.*") + public void testValidateLimit() throws FalconException { + feed.getClusters().getClusters().get(0).getLifecycle().getRetentionStage().getProperties().getProperties() + .get(0).setValue("invalid"); + new AgeBasedDelete().validate(feed, clusterName); + } + + @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = ".*limit is required.*") + public void testStageValidity() throws Exception { + feed.getClusters().getClusters().get(0).getLifecycle().getRetentionStage().getProperties().getProperties() + .clear(); + new AgeBasedDelete().validate(feed, clusterName); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index 157edf9..40c0a3e 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -90,6 +90,13 @@ <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-feed-lifecycle</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java index b819dee..9e55edf 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java @@ -20,10 +20,14 @@ package org.apache.falcon.oozie.feed; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; +import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Lifecycle; +import org.apache.falcon.lifecycle.LifecyclePolicy; import org.apache.falcon.oozie.OozieBundleBuilder; import org.apache.falcon.oozie.OozieCoordinatorBuilder; +import org.apache.falcon.service.LifecyclePolicyMap; import org.apache.hadoop.fs.Path; import java.util.ArrayList; @@ -38,16 +42,32 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> { super(entity); } - @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException { - List<Properties> props = new ArrayList<Properties>(); - List<Properties> evictionProps = - OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath); - if (evictionProps != null) { - props.addAll(evictionProps); + @Override + protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException { + // if feed has lifecycle defined - then use it to create coordinator and wf else fall back + List<Properties> props = new ArrayList<>(); + Lifecycle lifecycle = this.entity.getLifecycle(); + if (lifecycle != null) { + for (String name : FeedHelper.getPolicies(this.entity, cluster.getName())) { + LifecyclePolicy policy = LifecyclePolicyMap.get().get(name); + if (policy == null) { + LOG.error("Couldn't find lifecycle policy for name:{}", name); + throw new FalconException("Invalid policy name " + name); + } + Properties appProps = policy.build(cluster, buildPath, this.entity); + if (appProps != null) { + props.add(appProps); + } + } + } else { + List<Properties> evictionProps = + OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath); + if (evictionProps != null) { + props.addAll(evictionProps); + } } - - List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION).buildCoords(cluster, - buildPath); + List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION) + .buildCoords(cluster, buildPath); if (replicationProps != null) { props.addAll(replicationProps); } @@ -55,7 +75,6 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> { if (!props.isEmpty()) { copySharedLibs(cluster, new Path(getLibPath(buildPath))); } - return props; } http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index 7d0174a..cfce1ae 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -49,6 +49,7 @@ import org.apache.falcon.oozie.workflow.JAVA; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.service.LifecyclePolicyMap; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; @@ -88,12 +89,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { private Feed feed; private Feed tableFeed; private Feed fsReplFeed; + private Feed lifecycleRetentionFeed; + private Feed retentionFeed; private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml"; private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml"; private static final String FEED = "/feed/feed.xml"; private static final String TABLE_FEED = "/feed/table-replication-feed.xml"; private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml"; + private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml"; + private static final String FS_RETENTION_ORIG_FEED = "/feed/fs-retention-feed.xml"; @BeforeClass public void setUpDFS() throws Exception { @@ -105,6 +110,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { trgMiniDFS = EmbeddedCluster.newCluster("cluster2"); String trgHdfsUrl = trgMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY); + LifecyclePolicyMap.get().init(); cleanupStore(); org.apache.falcon.entity.v0.cluster.Property property = @@ -124,6 +130,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { feed = (Feed) storeEntity(EntityType.FEED, FEED); fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED); tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED); + lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED); + retentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_ORIG_FEED); } private Entity storeEntity(EntityType type, String resource) throws Exception { @@ -150,6 +158,32 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } @Test + public void testRetentionWithLifecycle() throws Exception { + OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed); + Path bundlePath = new Path("/projects/falcon/"); + builder.build(trgCluster, bundlePath); + + BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath); + List<COORDINATOR> coords = bundle.getCoordinator(); + + COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath()); + assertLibExtensions(coord, "retention"); + HashMap<String, String> props = getCoordProperties(coord); + Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION"); + Assert.assertEquals(coord.getFrequency(), "${coord:hours(17)}"); + Assert.assertEquals(coord.getEnd(), "2099-01-01T00:00Z"); + Assert.assertEquals(coord.getTimezone(), "UTC"); + + HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); + Assert.assertEquals(wfProps.get("feedNames"), lifecycleRetentionFeed.getName()); + Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name())); + Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon"); + Assert.assertEquals(wfProps.get("queueName"), "retention"); + Assert.assertEquals(wfProps.get("limit"), "hours(2)"); + Assert.assertEquals(wfProps.get("jobPriority"), "LOW"); + } + + @Test public void testReplicationCoordsForFSStorage() throws Exception { OozieEntityBuilder builder = OozieEntityBuilder.get(feed); Path bundlePath = new Path("/projects/falcon/"); http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/resources/feed/fs-retention-feed.xml ---------------------------------------------------------------------- diff --git a/oozie/src/test/resources/feed/fs-retention-feed.xml b/oozie/src/test/resources/feed/fs-retention-feed.xml new file mode 100644 index 0000000..7eb85fa --- /dev/null +++ b/oozie/src/test/resources/feed/fs-retention-feed.xml @@ -0,0 +1,50 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="lifecycle original retention feed" name="retention-test" xmlns="uri:falcon:feed:0.1"> + <partitions> + <partition name="colo"/> + <partition name="eventTime"/> + <partition name="impressionHour"/> + <partition name="pricingModel"/> + </partitions> + + <frequency>minutes(5)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="minutes(1)"/> + + <clusters> + <cluster partition="${cluster.colo}" type="source" name="corp1"> + <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/> + <retention action="delete" limit="days(10000)"/> + </cluster> + </clusters> + + <locations> + <location path="/data/lifecycle/" type="data"/> + <location path="/data/regression/fetlrc/billing/stats" type="stats"/> + <location path="/data/regression/fetlrc/billing/metadata" type="meta"/> + </locations> + + <ACL permission="0x755" group="group" owner="fetl"/> + <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/> + <properties> + <property name="maxMaps" value="33" /> + <property name="mapBandwidth" value="2" /> + </properties> + +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml ---------------------------------------------------------------------- diff --git a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml new file mode 100644 index 0000000..2cadfe0 --- /dev/null +++ b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml @@ -0,0 +1,60 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="lifecycle retention feed" name="retention-lifecycle-test" xmlns="uri:falcon:feed:0.1"> + <partitions> + <partition name="colo"/> + <partition name="eventTime"/> + <partition name="impressionHour"/> + <partition name="pricingModel"/> + </partitions> + + <frequency>minutes(5)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="minutes(1)"/> + + <clusters> + <cluster partition="${cluster.colo}" type="source" name="corp2"> + <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/> + <retention action="delete" limit="days(10000)"/> + </cluster> + </clusters> + + <locations> + <location path="/data/lifecycle/" type="data"/> + <location path="/data/regression/fetlrc/billing/stats" type="stats"/> + <location path="/data/regression/fetlrc/billing/metadata" type="meta"/> + </locations> + + <ACL permission="0x755" group="group" owner="fetl"/> + <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/> + <properties> + <property name="maxMaps" value="33" /> + <property name="mapBandwidth" value="2" /> + </properties> + + <lifecycle> + <retention-stage> + <frequency>hours(17)</frequency> + <queue>retention</queue> + <priority>LOW</priority> + <properties> + <property name="retention.policy.agebaseddelete.limit" value="hours(2)"></property> + </properties> + </retention-stage> + </lifecycle> +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8127b46..9cdfc87 100644 --- a/pom.xml +++ b/pom.xml @@ -434,6 +434,7 @@ <module>rerun</module> <module>prism</module> <module>unit</module> + <module>lifecycle</module> <module>webapp</module> <module>docs</module> </modules> @@ -844,6 +845,12 @@ <dependency> <groupId>org.apache.falcon</groupId> + <artifactId>falcon-feed-lifecycle</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> <artifactId>falcon-process</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 8f3bc35..9c6aef7 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -26,6 +26,7 @@ ## DONT MODIFY UNLESS SURE ABOUT CHANGE ## *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine +*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager @@ -49,6 +50,12 @@ ##### Prism Services ##### prism.application.services=org.apache.falcon.entity.store.ConfigurationStore + +# List of Lifecycle policies configured. +*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete +# List of builders for the policies. +*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder + ##### Falcon Configuration Store Change listeners ##### *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ org.apache.falcon.entity.ColoClusterRelation,\
