Repository: falcon Updated Branches: refs/heads/master b36a82394 -> 9f69ae271
FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder. Contributed by Pallavi Rao Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9f69ae27 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9f69ae27 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9f69ae27 Branch: refs/heads/master Commit: 9f69ae27159436721c3fa1fdc401bb0de0cdca80 Parents: b36a823 Author: Ajay Yadava <[email protected]> Authored: Mon Jul 13 14:13:38 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Jul 13 14:27:03 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/oozie/OozieBundleBuilder.java | 38 +------ .../falcon/oozie/OozieCoordinatorBuilder.java | 70 ++---------- .../apache/falcon/oozie/OozieEntityBuilder.java | 32 ++++-- .../OozieOrchestrationWorkflowBuilder.java | 108 +++++++++++++++++-- .../feed/FSReplicationWorkflowBuilder.java | 12 +++ .../feed/FeedReplicationCoordinatorBuilder.java | 47 ++------ .../feed/FeedReplicationWorkflowBuilder.java | 44 +++++++- .../feed/FeedRetentionCoordinatorBuilder.java | 37 ++----- .../feed/FeedRetentionWorkflowBuilder.java | 42 +++++++- .../feed/HCatReplicationWorkflowBuilder.java | 8 ++ .../ProcessExecutionCoordinatorBuilder.java | 23 +--- .../ProcessExecutionWorkflowBuilder.java | 31 +++++- .../java/org/apache/falcon/util/OozieUtils.java | 6 +- .../feed/OozieFeedWorkflowBuilderTest.java | 64 ++++++----- .../falcon/oozie/process/AbstractTestBase.java | 55 +++++++--- .../OozieProcessWorkflowBuilderTest.java | 54 +++++----- 17 files changed, 403 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e844a60..88fdfdd 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava) IMPROVEMENTS + FALCON-1170 Falcon Native Scheduler - Refactor existing workflow/coord/bundle builder(Pallavi Rao via Ajay Yadava) + FALCON-1031 Make post processing notifications to user topics optional (Pallavi Rao via Ajay Yadava) FALCON-1186 Add filtering capability to result of instance summary (Suhas Vasu) http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java index 03063f4..8026967 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java @@ -19,12 +19,10 @@ package org.apache.falcon.oozie; import org.apache.falcon.FalconException; -import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.bundle.BUNDLEAPP; import org.apache.falcon.oozie.bundle.CONFIGURATION; @@ -83,9 +81,9 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu final String coordName = coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME); coord.setName(coordName); coord.setAppPath(getStoragePath(coordPath)); - Properties appProps = createAppProperties(cluster, buildPath, coordName); - appProps.putAll(coordProps); - coord.setConfiguration(getConfig(appProps)); + coordProps.put(OozieClient.USER_NAME, CurrentUser.getUser()); + coordProps.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); + coord.setConfiguration(getConfig(coordProps)); bundle.getCoordinator().add(coord); } @@ -114,35 +112,9 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu return conf; } - protected Properties createAppProperties(Cluster cluster, Path buildPath, - String coordName) throws FalconException { - Properties properties = getEntityProperties(cluster); - properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); - properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster)); - properties.setProperty("colo.name", cluster.getColo()); - - properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); - properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); - properties.setProperty("falcon.libpath", - ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib"); - - if (EntityUtil.isTableStorageType(cluster, entity)) { - Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity); - if (tag == Tag.REPLICATION) { - // todo: kludge send source hcat creds for coord dependency check to pass - String srcClusterName = EntityUtil.getWorkflowNameSuffix(coordName, entity); - properties.putAll(getHiveCredentials(ClusterHelper.getCluster(srcClusterName))); - } else { - properties.putAll(getHiveCredentials(cluster)); - } - } - - return properties; - } - protected Path marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException { return marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle), - OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml")); + OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml")); } //Used by coordinator builders to return multiple coords @@ -152,7 +124,7 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu public static BUNDLEAPP unmarshal(Cluster cluster, Path path) throws FalconException { try { FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - path.toUri(), ClusterHelper.getConfiguration(cluster)); + path.toUri(), ClusterHelper.getConfiguration(cluster)); Unmarshaller unmarshaller = OozieUtils.BUNDLE_JAXB_CONTEXT.createUnmarshaller(); @SuppressWarnings("unchecked") JAXBElement<BUNDLEAPP> jaxbElement = unmarshaller.unmarshal(new StreamSource(fs.open(path)), BUNDLEAPP.class); http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java index 92697b0..85f5330 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java @@ -21,23 +21,19 @@ package org.apache.falcon.oozie; import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; -import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.ExternalId; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.oozie.coordinator.CONFIGURATION; -import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.ObjectFactory; import org.apache.falcon.oozie.feed.FeedReplicationCoordinatorBuilder; import org.apache.falcon.oozie.feed.FeedRetentionCoordinatorBuilder; import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder; import org.apache.falcon.util.OozieUtils; -import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import org.apache.oozie.client.OozieClient; @@ -52,9 +48,7 @@ import java.util.Properties; public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEntityBuilder<T> { protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}"; protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}"; - protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L; - protected static final String MR_QUEUE_NAME = "queueName"; - protected static final String MR_JOB_PRIORITY = "jobPriority"; + protected static final String IGNORE = "IGNORE"; private static final Object USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled"; @@ -111,84 +105,36 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml")); } - protected Properties createCoordDefaultConfiguration(Cluster cluster, - String coordName) throws FalconException { + protected Properties createCoordDefaultConfiguration(String coordName) throws FalconException { Properties props = new Properties(); - props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName()); - props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name()); - props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName()); props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL); props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL); - props.put("falconDataOperation", getOperation().name()); - - props.put(WorkflowExecutionArgs.LOG_DIR.getName(), - getStoragePath(EntityUtil.getLogPath(cluster, entity))); props.put(OozieClient.EXTERNAL_ID, new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity), "${coord:nominalTime()}").getId()); - props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster)); - - addLateDataProperties(props); - addBrokerProperties(cluster, props); - - props.put(MR_QUEUE_NAME, "default"); - props.put(MR_JOB_PRIORITY, "NORMAL"); props.put(USER_JMS_NOTIFICATION_ENABLED, "true"); - //props in entity override the set props. - props.putAll(getEntityProperties(entity)); return props; } - protected abstract WorkflowExecutionContext.EntityOperations getOperation(); - - private void addLateDataProperties(Properties props) throws FalconException { - if (EntityUtil.getLateProcess(entity) == null - || EntityUtil.getLateProcess(entity).getLateInputs() == null - || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) { - props.put("shouldRecord", "false"); - } else { - props.put("shouldRecord", "true"); - } + public final Properties build(Cluster cluster, Path buildPath) throws FalconException { + throw new IllegalStateException("Not implemented for coordinator!"); } - private void addBrokerProperties(Cluster cluster, Properties props) { - props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(), - ClusterHelper.getMessageBrokerUrl(cluster)); - props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), - ClusterHelper.getMessageBrokerImplClass(cluster)); - - String falconBrokerUrl = StartupProperties.get().getProperty( - "broker.url", "tcp://localhost:61616?daemon=true"); - props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl); - - String falconBrokerImplClass = StartupProperties.get().getProperty( - "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); - props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass); + public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException; - String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", - DEFAULT_BROKER_MSG_TTL.toString()); - props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL); + protected COORDINATORAPP unmarshal(String template) throws FalconException { + return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class); } protected CONFIGURATION getConfig(Properties props) { CONFIGURATION conf = new CONFIGURATION(); for (Entry<Object, Object> prop : props.entrySet()) { - Property confProp = new Property(); + CONFIGURATION.Property confProp = new CONFIGURATION.Property(); confProp.setName((String) prop.getKey()); confProp.setValue((String) prop.getValue()); conf.getProperty().add(confProp); } return conf; } - - public final Properties build(Cluster cluster, Path buildPath) throws FalconException { - throw new IllegalStateException("Not implemented for coordinator!"); - } - - public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException; - - protected COORDINATORAPP unmarshal(String template) throws FalconException { - return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class); - } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java index f00290e..9ca0ac1 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java @@ -25,6 +25,7 @@ import org.apache.falcon.entity.CatalogStorage; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Output; @@ -36,9 +37,11 @@ import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.service.FalconPathFilter; import org.apache.falcon.service.SharedLibraryHostingService; import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.oozie.client.OozieClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +73,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { public static final String ENTITY_PATH = "ENTITY_PATH"; public static final String ENTITY_NAME = "ENTITY_NAME"; + protected static final String IGNORE = "IGNORE"; private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() { @Override @@ -117,12 +121,12 @@ public abstract class OozieEntityBuilder<T extends Entity> { } public static OozieEntityBuilder get(Entity entity) { - switch(entity.getEntityType()) { + switch (entity.getEntityType()) { case FEED: return new FeedBundleBuilder((Feed) entity); case PROCESS: - return new ProcessBundleBuilder((Process)entity); + return new ProcessBundleBuilder((Process) entity); default: } @@ -145,6 +149,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( outPath.toUri(), ClusterHelper.getConfiguration(cluster)); OutputStream out = fs.create(outPath); + try { marshaller.marshal(jaxbElement, out); } finally { @@ -158,11 +163,24 @@ public abstract class OozieEntityBuilder<T extends Entity> { } } + protected Properties createAppProperties(Cluster cluster, String wfName) throws FalconException { + Properties properties = getEntityProperties(cluster); + properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); + properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster)); + properties.setProperty("colo.name", cluster.getColo()); + + properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); + properties.setProperty("falcon.libpath", + ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib"); + + return properties; + } + protected Properties getHiveCredentials(Cluster cluster) { String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster); if (metaStoreUrl == null) { throw new IllegalStateException( - "Registry interface is not defined in cluster: " + cluster.getName()); + "Registry interface is not defined in cluster: " + cluster.getName()); } Properties hiveCredentials = new Properties(); @@ -173,7 +191,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { if (isSecurityEnabled) { String principal = ClusterHelper - .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL); + .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL); hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal); hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true"); hiveCredentials.put("hcat.metastore.principal", principal); @@ -236,9 +254,9 @@ public abstract class OozieEntityBuilder<T extends Entity> { //pig and java actions require partition expression as "key1=val1, key2=val2" props.put(prefix + "_partitions_pig", - "${coord:dataOutPartitions('" + output.getName() + "')}"); + "${coord:dataOutPartitions('" + output.getName() + "')}"); props.put(prefix + "_partitions_java", - "${coord:dataOutPartitions('" + output.getName() + "')}"); + "${coord:dataOutPartitions('" + output.getName() + "')}"); //hive requires partition expression as "key1='val1', key2='val2'" (with quotes around values) //there is no direct EL expression in oozie @@ -246,7 +264,7 @@ public abstract class OozieEntityBuilder<T extends Entity> { for (String key : tableStorage.getDatedPartitionKeys()) { StringBuilder expr = new StringBuilder(); expr.append("${coord:dataOutPartitionValue('").append(output.getName()).append("', '").append(key) - .append("')}"); + .append("')}"); props.put(prefix + "_dated_partition_value_" + key, expr.toString()); partitions.add(key + "='" + expr + "'"); http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java index 49f9e07..f8220ec 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java @@ -37,6 +37,7 @@ import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder; import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder; import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.workflow.CREDENTIAL; import org.apache.falcon.oozie.workflow.CREDENTIALS; import org.apache.falcon.oozie.workflow.END; @@ -46,17 +47,23 @@ import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import javax.xml.bind.JAXBElement; +import javax.xml.namespace.QName; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -79,9 +86,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml"; public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList( - new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, })); + new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, })); - private final LifeCycle lifecycle; + private LifeCycle lifecycle; + + protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L; + protected static final String MR_QUEUE_NAME = "queueName"; + protected static final String MR_JOB_PRIORITY = "jobPriority"; public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) { super(entity); @@ -96,6 +107,10 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend return lifecycle.getTag(); } + public OozieOrchestrationWorkflowBuilder(T entity) { + super(entity); + } + public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle) throws FalconException { switch (entity.getEntityType()) { @@ -115,7 +130,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend default: throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() - + ", lifecycle " + lifecycle); + + ", lifecycle " + lifecycle); } case PROCESS: @@ -192,7 +207,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend protected Path marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException { return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow), - OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml")); + OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml")); + } + + protected Path marshal(Cluster cluster, WORKFLOWAPP workflowapp, CONFIGURATION config, Path outPath) + throws FalconException { + QName workflowQName = new org.apache.falcon.oozie.workflow.ObjectFactory() + .createWorkflowApp(workflowapp).getName(); + JAXBElement<CONFIGURATION> configJaxbElement = + new JAXBElement(new QName(workflowQName.getNamespaceURI(), "configuration", workflowQName.getPrefix()), + CONFIGURATION.class, config); + + return marshal(cluster, configJaxbElement, OozieUtils.CONFIG_JAXB_CONTEXT, + new Path(outPath, "config-default.xml")); } protected WORKFLOWAPP unmarshal(String template) throws FalconException { @@ -212,13 +239,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException { String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext"; FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster)); + ClusterHelper.getConfiguration(cluster)); try { addExtensionJars(fs, new Path(libext), wf); addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf); if (tag != null) { addExtensionJars(fs, new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()), - wf); + wf); } } catch (IOException e) { throw new FalconException(e); @@ -316,7 +343,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend * @param cluster cluster entity */ protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName, - Set<String> actions) { + Set<String> actions) { addHCatalogCredentials(workflowApp, cluster, credentialName); // add credential to each action @@ -349,7 +376,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl)); credential.getProperty().add(createProperty("hcat.metastore.principal", - ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL))); + ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL))); return credential; } @@ -366,4 +393,69 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3")); action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1")); } + + public Properties createDefaultConfiguration(Cluster cluster) throws FalconException { + Properties props = new Properties(); + props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name()); + props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName()); + props.put("falconDataOperation", getOperation().name()); + + props.put(WorkflowExecutionArgs.LOG_DIR.getName(), + getStoragePath(EntityUtil.getLogPath(cluster, entity))); + props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster)); + + addLateDataProperties(props); + addBrokerProperties(cluster, props); + + props.put(MR_QUEUE_NAME, "default"); + props.put(MR_JOB_PRIORITY, "NORMAL"); + + //props in entity override the set props. + props.putAll(getEntityProperties(entity)); + props.putAll(createAppProperties(cluster, entity.getName())); + return props; + } + + private void addLateDataProperties(Properties props) throws FalconException { + if (EntityUtil.getLateProcess(entity) == null + || EntityUtil.getLateProcess(entity).getLateInputs() == null + || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) { + props.put("shouldRecord", "false"); + } else { + props.put("shouldRecord", "true"); + } + } + + private void addBrokerProperties(Cluster cluster, Properties props) { + props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(), + ClusterHelper.getMessageBrokerUrl(cluster)); + props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), + ClusterHelper.getMessageBrokerImplClass(cluster)); + + String falconBrokerUrl = StartupProperties.get().getProperty( + "broker.url", "tcp://localhost:61616?daemon=true"); + props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl); + + String falconBrokerImplClass = StartupProperties.get().getProperty( + "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS); + props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass); + + String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins", + DEFAULT_BROKER_MSG_TTL.toString()); + props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL); + } + + protected abstract WorkflowExecutionContext.EntityOperations getOperation(); + + protected CONFIGURATION getConfig(Properties props) { + CONFIGURATION conf = new CONFIGURATION(); + for (Map.Entry<Object, Object> prop : props.entrySet()) { + CONFIGURATION.Property confProp = new CONFIGURATION.Property(); + confProp.setName((String) prop.getKey()); + confProp.setValue((String) prop.getValue()); + conf.getProperty().add(confProp); + } + return conf; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java index 1d97204..0381e59 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java @@ -27,6 +27,7 @@ import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import java.util.Arrays; +import java.util.Properties; /** * Builds replication workflow for filesystem based feed. @@ -71,4 +72,15 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder decorateWorkflow(workflow, wfName, start); return workflow; } + + protected Properties getWorkflowProperties(Feed feed) throws FalconException { + Properties props = super.getWorkflowProperties(feed); + if (entity.getAvailabilityFlag() == null) { + props.put("availabilityFlag", "NA"); + } else { + props.put("availabilityFlag", entity.getAvailabilityFlag()); + } + + return props; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index de6f373..f5cc2c3 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -45,7 +45,6 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.SYNCDATASET; import org.apache.falcon.oozie.coordinator.WORKFLOW; import org.apache.falcon.oozie.coordinator.ACTION; -import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.conf.Configuration; @@ -73,8 +72,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F private static final String PARALLEL = "parallel"; private static final String TIMEOUT = "timeout"; - private static final String MR_MAX_MAPS = "maxMaps"; - private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; private static final String ORDER = "order"; public FeedReplicationCoordinatorBuilder(Feed entity) { @@ -101,18 +98,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F return null; } - @Override - protected WorkflowExecutionContext.EntityOperations getOperation() { - return WorkflowExecutionContext.EntityOperations.REPLICATE; - } - private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException { - - // Different workflow for each source since hive credentials vary for each cluster - OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster, - Tag.REPLICATION); - Properties wfProps = builder.build(trgCluster, buildPath); - long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster); Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis); Date sourceEndDate = getEndDate(srcCluster); @@ -127,6 +113,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F return null; } + // Different workflow for each source since hive credentials vary for each cluster + OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster, + Tag.REPLICATION); + Properties wfProps = builder.build(trgCluster, buildPath); + COORDINATORAPP coord = unmarshal(REPLICATION_COORD_TEMPLATE); String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()), @@ -155,24 +146,18 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F } private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath, - String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException { + String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException { ACTION action = new ACTION(); WORKFLOW workflow = new WORKFLOW(); workflow.setAppPath(getStoragePath(buildPath)); - Properties props = createCoordDefaultConfiguration(trgCluster, wfName); + Properties props = createCoordDefaultConfiguration(wfName); // Override CLUSTER_NAME property to include both source and target cluster pair String clusterProperty = trgCluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName(); props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty); props.put("srcClusterName", srcCluster.getName()); props.put("srcClusterColo", srcCluster.getColo()); - if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden - props.put(MR_MAX_MAPS, getDefaultMaxMaps()); - } - if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden - props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth()); - } // the storage type is uniform across source and target feeds for replication props.put("falconFeedStorageType", sourceStorage.getType().name()); @@ -183,12 +168,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F instancePaths = pathsWithPartitions; propagateFileSystemCopyProperties(pathsWithPartitions, props); - - if (entity.getAvailabilityFlag() == null) { - props.put("availabilityFlag", "NA"); - } else { - props.put("availabilityFlag", entity.getAvailabilityFlag()); - } } else if (sourceStorage.getType() == Storage.TYPE.TABLE) { instancePaths = "${coord:dataIn('input')}"; final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage; @@ -197,25 +176,17 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget"); propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props); setupHiveConfiguration(srcCluster, trgCluster, buildPath); - props.put("availabilityFlag", "NA"); } propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props); - props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); - + // Add the custom properties set in feed. Else, dryrun won't catch any missing props. + props.putAll(getEntityProperties(entity)); workflow.setConfiguration(getConfig(props)); action.setWorkflow(workflow); return action; } - private String getDefaultMaxMaps() { - return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5"); - } - - private String getDefaultMapBandwidth() { - return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100"); - } private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException { String srcPart = FeedHelper.normalizePartitionExpression( http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java index aa936ad..fb41b96 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -23,6 +23,7 @@ import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; @@ -31,6 +32,8 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import java.util.Properties; @@ -41,6 +44,8 @@ import java.util.Properties; public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> { protected static final String REPLICATION_ACTION_TEMPLATE = "/action/feed/replication-action.xml"; protected static final String REPLICATION_ACTION_NAME = "replication"; + private static final String MR_MAX_MAPS = "maxMaps"; + private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; public FeedReplicationWorkflowBuilder(Feed entity) { super(entity, LifeCycle.REPLICATION); @@ -56,8 +61,32 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION); marshal(cluster, workflow, buildPath); - return getProperties(buildPath, wfName); + Properties props = getProperties(buildPath, wfName); + props.putAll(createDefaultConfiguration(cluster)); + if (EntityUtil.isTableStorageType(cluster, entity)) { + // todo: kludge send source hcat creds for coord dependency check to pass + props.putAll(getHiveCredentials(srcCluster)); + props.putAll(getHiveCredentials(cluster)); + } + props.putAll(getWorkflowProperties(entity)); + props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); + // Write out the config to config-default.xml + marshal(cluster, workflow, getConfig(props), buildPath); + return props; } + + protected Properties getWorkflowProperties(Feed feed) throws FalconException { + Properties props = FeedHelper.getFeedProperties(feed); + if (props.getProperty(MR_MAX_MAPS) == null) { // set default if user has not overridden + props.put(MR_MAX_MAPS, getDefaultMaxMaps()); + } + if (props.getProperty(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden + props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth()); + } + + return props; + } + protected ACTION addHDFSServersConfig(ACTION action, Cluster sourceCluster, Cluster targetCluster) { if (isSecurityEnabled) { // this is to ensure that the delegation tokens are checked out for both clusters @@ -70,4 +99,17 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW return action; } protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException; + + @Override + protected WorkflowExecutionContext.EntityOperations getOperation() { + return WorkflowExecutionContext.EntityOperations.REPLICATE; + } + + private String getDefaultMaxMaps() { + return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5"); + } + + private String getDefaultMapBandwidth() { + return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100"); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java index c896d5a..ce9ef9a 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java @@ -22,7 +22,6 @@ import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.Frequency.TimeUnit; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; @@ -33,8 +32,6 @@ import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.coordinator.ACTION; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.WORKFLOW; -import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import java.util.Arrays; @@ -73,32 +70,15 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee } Path coordPath = getBuildPath(buildPath); - Properties props = createCoordDefaultConfiguration(cluster, coordName); - props.put("timeZone", entity.getTimezone().getID()); - props.put("frequency", entity.getFrequency().getTimeUnit().name()); - - final Storage storage = FeedHelper.createStorage(cluster, entity); - props.put("falconFeedStorageType", storage.getType().name()); - - String feedDataPath = storage.getUriTemplate(); - props.put("feedDataPath", - feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX)); - - props.put("limit", feedCluster.getRetention().getLimit().toString()); - - props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName()); - props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE); - - props.put("falconInputFeeds", entity.getName()); - props.put("falconInPaths", IGNORE); - - props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); + Properties props = createCoordDefaultConfiguration(coordName); WORKFLOW workflow = new WORKFLOW(); - Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster, + Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster, coordPath); - workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH))); - props.putAll(wfProp); + workflow.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH))); + props.putAll(getProperties(coordPath, coordName)); + // Add the custom properties set in feed. Else, dryrun won't catch any missing props. + props.putAll(getEntityProperties(entity)); workflow.setConfiguration(getConfig(props)); ACTION action = new ACTION(); action.setWorkflow(workflow); @@ -108,9 +88,4 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee Path marshalPath = marshal(cluster, coord, coordPath); return Arrays.asList(getProperties(marshalPath, coordName)); } - - @Override - protected WorkflowExecutionContext.EntityOperations getOperation() { - return WorkflowExecutionContext.EntityOperations.DELETE; - } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java index 51e081f..b56f0dd 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -22,11 +22,15 @@ import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.Tag; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import java.util.Properties; @@ -64,20 +68,47 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME); addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION); + Properties props = getProperties(buildPath, wfName); + props.putAll(getWorkflowProperties(cluster)); + props.putAll(createDefaultConfiguration(cluster)); + props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); + if (EntityUtil.isTableStorageType(cluster, entity)) { setupHiveCredentials(cluster, buildPath, workflow); + // todo: kludge send source hcat creds for coord dependency check to pass + props.putAll(getHiveCredentials(cluster)); } marshal(cluster, workflow, buildPath); - Properties props = getProperties(buildPath, wfName); - props.putAll(getWorkflowProperties()); + + // Write out the config to config-default.xml + marshal(cluster, workflow, getConfig(props), buildPath); return props; } - private Properties getWorkflowProperties() { + private Properties getWorkflowProperties(Cluster cluster) throws FalconException { Properties props = new Properties(); props.setProperty("srcClusterName", "NA"); props.setProperty("availabilityFlag", "NA"); + + props.put("timeZone", entity.getTimezone().getID()); + props.put("frequency", entity.getFrequency().getTimeUnit().name()); + + org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName()); + final Storage storage = FeedHelper.createStorage(cluster, entity); + props.put("falconFeedStorageType", storage.getType().name()); + + String feedDataPath = storage.getUriTemplate(); + props.put("feedDataPath", + feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX)); + + props.put("limit", feedCluster.getRetention().getLimit().toString()); + + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE); + + props.put("falconInputFeeds", entity.getName()); + props.put("falconInPaths", IGNORE); return props; } @@ -110,4 +141,9 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil } } } + + @Override + protected WorkflowExecutionContext.EntityOperations getOperation() { + return WorkflowExecutionContext.EntityOperations.DELETE; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java index 72bbca4..347ddaf 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java @@ -27,6 +27,7 @@ import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import java.util.Arrays; +import java.util.Properties; /** * Builds replication workflow for hcat based feed. @@ -135,4 +136,11 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild } } } + + protected Properties getWorkflowProperties(Feed feed) throws FalconException { + Properties props = super.getWorkflowProperties(feed); + props.put("availabilityFlag", "NA"); + + return props; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java index 60f9fe1..d6d42e1 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java @@ -36,7 +36,6 @@ import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.entity.v0.process.Workflow; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.oozie.OozieCoordinatorBuilder; import org.apache.falcon.oozie.OozieEntityBuilder; @@ -51,7 +50,6 @@ import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS; import org.apache.falcon.oozie.coordinator.SYNCDATASET; import org.apache.falcon.oozie.coordinator.WORKFLOW; import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import java.util.ArrayList; @@ -82,37 +80,31 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< coord.setControls(controls); // Configuration - Properties props = createCoordDefaultConfiguration(cluster, coordName); + Properties props = createCoordDefaultConfiguration(coordName); initializeInputPaths(cluster, coord, props); // inputs initializeOutputPaths(cluster, coord, props); // outputs - Workflow processWorkflow = entity.getWorkflow(); - propagateUserWorkflowProperties(processWorkflow, props); - // create parent wf Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT).build(cluster, coordPath); WORKFLOW wf = new WORKFLOW(); wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH))); - props.putAll(wfProps); + // Add the custom properties set in feed. Else, dryrun won't catch any missing props. + props.putAll(getEntityProperties(entity)); wf.setConfiguration(getConfig(props)); // set coord action to parent wf org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION(); action.setWorkflow(wf); + coord.setAction(action); Path marshalPath = marshal(cluster, coord, coordPath); return Arrays.asList(getProperties(marshalPath, coordName)); } - @Override - protected WorkflowExecutionContext.EntityOperations getOperation() { - return WorkflowExecutionContext.EntityOperations.GENERATE; - } - private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) { coord.setName(coordName); org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity, @@ -351,12 +343,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< } //RESUME CHECKSTYLE CHECK ParameterNumberCheck - private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) { - props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName( - processWorkflow.getName(), entity.getName())); - props.put("userWorkflowVersion", processWorkflow.getVersion()); - props.put("userWorkflowEngine", processWorkflow.getEngine().value()); - } + protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) { String prefix = "falcon_" + input.getName(); http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index 8b18ecc..ac436ca 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.CatalogStorage; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.ProcessHelper; import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; @@ -33,15 +34,18 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Workflow; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.oozie.client.OozieClient; import java.io.IOException; import java.util.ArrayList; @@ -102,8 +106,21 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration } marshal(cluster, wfApp, buildPath); - Properties props = getProperties(buildPath, wfName); + Properties props = createDefaultConfiguration(cluster); + props.putAll(getProperties(buildPath, wfName)); props.putAll(getWorkflowProperties()); + props.setProperty(OozieClient.APP_PATH, buildPath.toString()); + + //Add libpath + Path libPath = new Path(buildPath, "lib"); + copySharedLibs(cluster, libPath); + props.put(OozieClient.LIBPATH, libPath.toString()); + + Workflow processWorkflow = ((Process)(entity)).getWorkflow(); + propagateUserWorkflowProperties(processWorkflow, props); + + // Write out the config to config-default.xml + marshal(cluster, wfApp, getConfig(props), buildPath); return props; } @@ -251,4 +268,16 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration throw new FalconException("Error adding archive for custom jars under: " + libPath, e); } } + + private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) { + props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName( + processWorkflow.getName(), entity.getName())); + props.put("userWorkflowVersion", processWorkflow.getVersion()); + props.put("userWorkflowEngine", processWorkflow.getEngine().value()); + } + + @Override + protected WorkflowExecutionContext.EntityOperations getOperation() { + return WorkflowExecutionContext.EntityOperations.GENERATE; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java index 0ae229c..149a7e6 100644 --- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java +++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java @@ -20,6 +20,7 @@ package org.apache.falcon.util; import org.apache.falcon.oozie.bundle.BUNDLEAPP; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.hive.ACTION; +import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import org.apache.hadoop.conf.Configuration; import org.apache.xerces.dom.ElementNSImpl; @@ -43,6 +44,7 @@ public final class OozieUtils { public static final JAXBContext ACTION_JAXB_CONTEXT; public static final JAXBContext COORD_JAXB_CONTEXT; public static final JAXBContext BUNDLE_JAXB_CONTEXT; + public static final JAXBContext CONFIG_JAXB_CONTEXT; protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT; static { @@ -51,6 +53,7 @@ public final class OozieUtils { ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class); COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class); BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class); + CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(CONFIGURATION.class); HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance( org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName()); } catch (JAXBException e) { @@ -72,7 +75,7 @@ public final class OozieUtils { } @SuppressWarnings("unchecked") - public static JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) { + public static JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) { try { Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller(); unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler()); @@ -94,5 +97,4 @@ public final class OozieUtils { throw new RuntimeException("Unable to marshall hive action.", e); } } - } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index b223447..2f7787d 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -201,9 +201,12 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { HashMap<String, String> props = getCoordProperties(coord); - verifyEntityProperties(feed, trgCluster, srcCluster, + verifyEntityProperties(trgCluster, srcCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, props); - verifyBrokerProperties(trgCluster, props); + + HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); + verifyEntityProperties(feed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps); + verifyBrokerProperties(trgCluster, wfProps); // verify the replication param that feed replicator depends on String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed); @@ -226,15 +229,15 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}"); // verify workflow params - Assert.assertEquals(props.get("userWorkflowName"), "replication-policy"); - Assert.assertEquals(props.get("userWorkflowVersion"), "0.6"); - Assert.assertEquals(props.get("userWorkflowEngine"), "falcon"); + Assert.assertEquals(wfProps.get("userWorkflowName"), "replication-policy"); + Assert.assertEquals(wfProps.get("userWorkflowVersion"), "0.6"); + Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon"); // verify default params - Assert.assertEquals(props.get("queueName"), "default"); - Assert.assertEquals(props.get("jobPriority"), "NORMAL"); - Assert.assertEquals(props.get("maxMaps"), "5"); - Assert.assertEquals(props.get("mapBandwidth"), "100"); + Assert.assertEquals(wfProps.get("queueName"), "default"); + Assert.assertEquals(wfProps.get("jobPriority"), "NORMAL"); + Assert.assertEquals(wfProps.get("maxMaps"), "5"); + Assert.assertEquals(wfProps.get("mapBandwidth"), "100"); assertLibExtensions(coord, "replication"); WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord); @@ -340,12 +343,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}"); Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}"); Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name()); - Assert.assertEquals(props.get("maxMaps"), "33"); - Assert.assertEquals(props.get("mapBandwidth"), "2"); - verifyEntityProperties(aFeed, aCluster, srcCluster, + verifyEntityProperties(aCluster, srcCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, props); - verifyBrokerProperties(trgCluster, props); + + HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); + verifyEntityProperties(aFeed, aCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps); + verifyBrokerProperties(aCluster, wfProps); + + Assert.assertEquals(wfProps.get("maxMaps"), "33"); + Assert.assertEquals(wfProps.get("mapBandwidth"), "2"); } public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) { @@ -484,9 +491,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), wfPath.toString()); - verifyEntityProperties(tableFeed, trgCluster, srcCluster, - WorkflowExecutionContext.EntityOperations.REPLICATE, props); - verifyBrokerProperties(trgCluster, props); + HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); + verifyEntityProperties(tableFeed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps); + verifyBrokerProperties(trgCluster, wfProps); } private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException { @@ -592,10 +599,13 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { HashMap<String, String> props = getCoordProperties(coord); - String feedDataPath = props.get("feedDataPath"); - String storageType = props.get("falconFeedStorageType"); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); + + String feedDataPath = wfProps.get("feedDataPath"); + String storageType = wfProps.get("falconFeedStorageType"); // verify the param that feed evictor depends on + Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name()); final Storage storage = FeedHelper.createStorage(cluster, feed); @@ -609,8 +619,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } // verify the post processing params - Assert.assertEquals(props.get("feedNames"), feed.getName()); - Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE"); + Assert.assertEquals(wfProps.get("feedNames"), feed.getName()); + Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE"); assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord)); @@ -651,8 +661,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { HashMap<String, String> props = getCoordProperties(coord); - String feedDataPath = props.get("feedDataPath"); - String storageType = props.get("falconFeedStorageType"); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); + + String feedDataPath = wfProps.get("feedDataPath"); + String storageType = wfProps.get("falconFeedStorageType"); // verify the param that feed evictor depends on Assert.assertEquals(storageType, Storage.TYPE.TABLE.name()); @@ -668,13 +680,13 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } // verify the post processing params - Assert.assertEquals(props.get("feedNames"), tableFeed.getName()); - Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE"); + Assert.assertEquals(wfProps.get("feedNames"), tableFeed.getName()); + Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE"); assertWorkflowRetries(coord); - verifyBrokerProperties(srcCluster, props); + verifyBrokerProperties(srcCluster, wfProps); verifyEntityProperties(tableFeed, trgCluster, - WorkflowExecutionContext.EntityOperations.DELETE, props); + WorkflowExecutionContext.EntityOperations.DELETE, wfProps); Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster)); assertHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java index 6488682..ce76594 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java @@ -31,10 +31,11 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.oozie.bundle.BUNDLEAPP; -import org.apache.falcon.oozie.coordinator.CONFIGURATION; +import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; @@ -46,6 +47,9 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBElement; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; import javax.xml.transform.stream.StreamSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; @@ -116,7 +120,7 @@ public class AbstractTestBase { Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd")); unmarshaller.setSchema(schema); JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal( - new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class); + new StreamSource(new ByteArrayInputStream(coordStr.trim().getBytes())), COORDINATORAPP.class); return jaxbBundle.getValue(); } @@ -128,7 +132,7 @@ public class AbstractTestBase { Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd")); unmarshaller.setSchema(schema); JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal( - new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class); + new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class); return jaxbBundle.getValue(); } @@ -153,7 +157,7 @@ public class AbstractTestBase { } protected void assertLibExtensions(FileSystem fs, COORDINATORAPP coord, EntityType type, - String lifecycle) throws Exception { + String lifecycle) throws Exception { WORKFLOWAPP wf = getWorkflowapp(fs, coord); List<Object> actions = wf.getDecisionOrForkOrJoin(); String lifeCyclePath = lifecycle == null ? "" : "/" + lifecycle; @@ -172,7 +176,7 @@ public class AbstractTestBase { } if (files != null) { Assert.assertTrue(files.get(files.size() - 1).endsWith( - "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar")); + "/projects/falcon/working/libext/" + type.name() + lifeCyclePath + "/ext.jar")); } } } @@ -209,36 +213,57 @@ public class AbstractTestBase { protected HashMap<String, String> getCoordProperties(COORDINATORAPP coord) { HashMap<String, String> props = new HashMap<String, String>(); - for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) { + for (org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop + : coord.getAction().getWorkflow().getConfiguration().getProperty()) { props.put(prop.getName(), prop.getValue()); } return props; } - protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster, + protected HashMap<String, String> getWorkflowProperties(FileSystem fs, COORDINATORAPP coord) + throws JAXBException, IOException, XMLStreamException { + + String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); + StreamSource xml = new StreamSource(fs.open(new Path(wfPath + "/config-default.xml"))); + XMLInputFactory xif = XMLInputFactory.newFactory(); + XMLStreamReader xsr = xif.createXMLStreamReader(xml); + JAXBContext jaxbContext = OozieUtils.CONFIG_JAXB_CONTEXT; + CONFIGURATION jaxbConfig = ((JAXBElement<CONFIGURATION>) jaxbContext.createUnmarshaller(). + unmarshal(xsr, CONFIGURATION.class)).getValue(); + + HashMap<String, String> props = new HashMap<String, String>(); + for (CONFIGURATION.Property prop : jaxbConfig.getProperty()) { + props.put(prop.getName(), prop.getValue()); + } + return props; + } + + protected void verifyEntityProperties(Cluster cluster, Cluster srcCluster, WorkflowExecutionContext.EntityOperations operation, HashMap<String, String> props) throws Exception { - Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()), - entity.getName()); - Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()), - entity.getEntityType().name()); if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) { Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName()); } else { Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName()); } - Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity)); - Assert.assertEquals(props.get("falconDataOperation"), operation.name()); } protected void verifyEntityProperties(Entity entity, Cluster cluster, WorkflowExecutionContext.EntityOperations operation, HashMap<String, String> props) throws Exception { - verifyEntityProperties(entity, cluster, null, operation, props); + Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()), + entity.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()), + entity.getEntityType().name()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity)); + Assert.assertEquals(props.get("falconDataOperation"), operation.name()); + Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName())); + Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName())); + Assert.assertTrue(props.containsKey(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName())); } - private String getLogPath(Cluster cluster, Entity entity) { + protected String getLogPath(Cluster cluster, Entity entity) { Path logPath = EntityUtil.getLogPath(cluster, entity); return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath; } http://git-wip-us.apache.org/repos/asf/falcon/blob/9f69ae27/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index 4e5c3f0..3aaf304 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -188,14 +188,15 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA)); HashMap<String, String> props = getCoordProperties(coord); - assertEquals(props.get("mapred.job.priority"), "LOW"); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); + assertEquals(wfProps.get("mapred.job.priority"), "LOW"); List<Input> inputs = process.getInputs().getInputs(); assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs .get(1).getName()); verifyEntityProperties(process, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); assertLibExtensions(fs, coord, EntityType.PROCESS, null); } @@ -285,10 +286,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(process, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); // verify table and hive props Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process); @@ -347,11 +349,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(process, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -401,11 +403,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(process, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -451,10 +453,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); - HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(process, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -550,9 +552,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(entry.getValue(), expected.get(entry.getKey())); } } + + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(process, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); // verify the late data params Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed()); @@ -684,9 +688,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(processEntity, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); String[] expected = { WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), @@ -694,9 +699,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), WorkflowExecutionArgs.INPUT_NAMES.getName(), - WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), - WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), - WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), }; for (String property : expected) { @@ -726,9 +728,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(processEntity, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks"); Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "IGNORE"); @@ -756,9 +759,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); HashMap<String, String> props = getCoordProperties(coord); + HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); verifyEntityProperties(processEntity, cluster, - WorkflowExecutionContext.EntityOperations.GENERATE, props); - verifyBrokerProperties(cluster, props); + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "impressions"); Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");
