Repository: falcon Updated Branches: refs/heads/master 5a3e1d66f -> 77910aefd
http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/deploy.properties ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/deploy.properties b/unit/src/main/resources/deploy.properties new file mode 100644 index 0000000..7ad5007 --- /dev/null +++ b/unit/src/main/resources/deploy.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Application deployment properties.particularly relating to whether the server is in embedded mode or distributed mode. +*.domain=all +*.deploy.mode=embedded \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/localoozie-log4j.properties ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/localoozie-log4j.properties b/unit/src/main/resources/localoozie-log4j.properties new file mode 100644 index 0000000..84b2d7a --- /dev/null +++ b/unit/src/main/resources/localoozie-log4j.properties @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.appender.oozie=org.apache.log4j.ConsoleAppender +log4j.appender.oozie.Target=System.out +log4j.appender.oozie.layout=org.apache.log4j.PatternLayout +log4j.appender.oozie.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n + +log4j.appender.null=org.apache.log4j.varia.NullAppender + +log4j.logger.org.apache=INFO, oozie +log4j.logger.org.mortbay=WARN, oozie +log4j.logger.org.hsqldb=WARN, oozie + +log4j.logger.opslog=NONE, null +log4j.logger.applog=NONE, null +log4j.logger.instrument=NONE, null + +log4j.logger.a=ALL, null \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/log4j.xml b/unit/src/main/resources/log4j.xml new file mode 100644 index 0000000..a161eb0 --- /dev/null +++ b/unit/src/main/resources/log4j.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> + </layout> + </appender> + + <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${user.dir}/target/logs/application.log"/> + <param name="Append" value="true"/> + <param name="Threshold" value="debug"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> + </layout> + </appender> + + <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${user.dir}/target/logs/audit.log"/> + <param name="Append" value="true"/> + <param name="Threshold" value="debug"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %x %m%n"/> + </layout> + </appender> + + <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${user.dir}/target/logs/metric.log"/> + <param name="Append" value="true"/> + <param name="Threshold" value="debug"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %m%n"/> + </layout> + </appender> + + <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender"> + <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/> + <param name="Append" value="true"/> + <param name="Threshold" value="debug"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %m%n"/> + </layout> + </appender> + + <logger name="org.apache.falcon" additivity="false"> + <level value="info"/> + <appender-ref ref="console"/> + </logger> + + <logger name="org.apache.oozie" additivity="false"> + <level value="info"/> + <appender-ref ref="console"/> + </logger> + + <logger name="AUDIT"> + <level value="info"/> + <appender-ref ref="AUDIT"/> + </logger> + + <logger name="METRIC"> + <level value="info"/> + <appender-ref ref="METRIC"/> + </logger> + + <root> + <priority value="info"/> + <appender-ref ref="console"/> + </root> + +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/mapred-site.xml ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/mapred-site.xml b/unit/src/main/resources/mapred-site.xml new file mode 100644 index 0000000..f60981e --- /dev/null +++ b/unit/src/main/resources/mapred-site.xml @@ -0,0 +1,35 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + + <property> + <name>mapreduce.framework.name</name> + <value>unit</value> + </property> + + <property> + <name>mapreduce.jobtracker.system.dir</name> + <value>/tmp</value> + </property> + +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/oozie-site.xml ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/oozie-site.xml b/unit/src/main/resources/oozie-site.xml new file mode 100644 index 0000000..23d41eb --- /dev/null +++ b/unit/src/main/resources/oozie-site.xml @@ -0,0 +1,170 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + <property> + <name>oozie.service.HadoopAccessorService.supported.filesystems</name> + <value>hdfs,hftp,webhdfs,jail</value> + </property> + <property> + <name>oozie.service.JPAService.create.db.schema</name> + <value>true</value> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-job-submit-instances</name> + <value> + now=org.apache.oozie.extensions.OozieELExtensions#ph1_now_echo, + today=org.apache.oozie.extensions.OozieELExtensions#ph1_today_echo, + yesterday=org.apache.oozie.extensions.OozieELExtensions#ph1_yesterday_echo, + currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_currentWeek_echo, + lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_lastWeek_echo, + currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_currentMonth_echo, + lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_lastMonth_echo, + currentYear=org.apache.oozie.extensions.OozieELExtensions#ph1_currentYear_echo, + lastYear=org.apache.oozie.extensions.OozieELExtensions#ph1_lastYear_echo, + formatTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_formatTime_echo, + latest=org.apache.oozie.coord.CoordELFunctions#ph2_coord_latest_echo, + future=org.apache.oozie.coord.CoordELFunctions#ph2_coord_future_echo + </value> + <description> + EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD. + This property is a convenience property to add extensions to the built in executors without having to + include all the built in ones. + </description> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-action-create-inst</name> + <value> + now=org.apache.oozie.extensions.OozieELExtensions#ph2_now_inst, + today=org.apache.oozie.extensions.OozieELExtensions#ph2_today_inst, + yesterday=org.apache.oozie.extensions.OozieELExtensions#ph2_yesterday_inst, + currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_currentWeek_inst, + lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_lastWeek_inst, + currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_currentMonth_inst, + lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_lastMonth_inst, + currentYear=org.apache.oozie.extensions.OozieELExtensions#ph2_currentYear_inst, + lastYear=org.apache.oozie.extensions.OozieELExtensions#ph2_lastYear_inst, + latest=org.apache.oozie.coord.CoordELFunctions#ph2_coord_latest_echo, + future=org.apache.oozie.coord.CoordELFunctions#ph2_coord_future_echo, + formatTime=org.apache.oozie.coord.CoordELFunctions#ph2_coord_formatTime, + user=org.apache.oozie.coord.CoordELFunctions#coord_user + </value> + <description> + EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD. + This property is a convenience property to add extensions to the built in executors without having to + include all the built in ones. + </description> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-action-create</name> + <value> + now=org.apache.oozie.extensions.OozieELExtensions#ph2_now, + today=org.apache.oozie.extensions.OozieELExtensions#ph2_today, + yesterday=org.apache.oozie.extensions.OozieELExtensions#ph2_yesterday, + currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_currentWeek, + lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_lastWeek, + currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_currentMonth, + lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_lastMonth, + currentYear=org.apache.oozie.extensions.OozieELExtensions#ph2_currentYear, + lastYear=org.apache.oozie.extensions.OozieELExtensions#ph2_lastYear, + latest=org.apache.oozie.coord.CoordELFunctions#ph2_coord_latest_echo, + future=org.apache.oozie.coord.CoordELFunctions#ph2_coord_future_echo, + formatTime=org.apache.oozie.coord.CoordELFunctions#ph2_coord_formatTime, + user=org.apache.oozie.coord.CoordELFunctions#coord_user + </value> + <description> + EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD. + This property is a convenience property to add extensions to the built in executors without having to + include all the built in ones. + </description> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-job-submit-data</name> + <value> + now=org.apache.oozie.extensions.OozieELExtensions#ph1_now_echo, + today=org.apache.oozie.extensions.OozieELExtensions#ph1_today_echo, + yesterday=org.apache.oozie.extensions.OozieELExtensions#ph1_yesterday_echo, + currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_currentWeek_echo, + lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_lastWeek_echo, + currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_currentMonth_echo, + lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_lastMonth_echo, + currentYear=org.apache.oozie.extensions.OozieELExtensions#ph1_currentYear_echo, + lastYear=org.apache.oozie.extensions.OozieELExtensions#ph1_lastYear_echo, + dataIn=org.apache.oozie.extensions.OozieELExtensions#ph1_dataIn_echo, + instanceTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_nominalTime_echo_wrap, + formatTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_formatTime_echo, + dateOffset=org.apache.oozie.coord.CoordELFunctions#ph1_coord_dateOffset_echo, + user=org.apache.oozie.coord.CoordELFunctions#coord_user + </value> + <description> + EL constant declarations, separated by commas, format is [PREFIX:]NAME=CLASS#CONSTANT. + This property is a convenience property to add extensions to the built in executors without having to + include all the built in ones. + </description> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-action-start</name> + <value> + now=org.apache.oozie.extensions.OozieELExtensions#ph2_now, + today=org.apache.oozie.extensions.OozieELExtensions#ph2_today, + yesterday=org.apache.oozie.extensions.OozieELExtensions#ph2_yesterday, + currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_currentWeek, + lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_lastWeek, + currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_currentMonth, + lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_lastMonth, + currentYear=org.apache.oozie.extensions.OozieELExtensions#ph2_currentYear, + lastYear=org.apache.oozie.extensions.OozieELExtensions#ph2_lastYear, + latest=org.apache.oozie.coord.CoordELFunctions#ph3_coord_latest, + future=org.apache.oozie.coord.CoordELFunctions#ph3_coord_future, + dataIn=org.apache.oozie.extensions.OozieELExtensions#ph3_dataIn, + instanceTime=org.apache.oozie.coord.CoordELFunctions#ph3_coord_nominalTime, + dateOffset=org.apache.oozie.coord.CoordELFunctions#ph3_coord_dateOffset, + formatTime=org.apache.oozie.coord.CoordELFunctions#ph3_coord_formatTime, + user=org.apache.oozie.coord.CoordELFunctions#coord_user + </value> + <description> + EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD. + This property is a convenience property to add extensions to the built in executors without having to + include all the built in ones. + </description> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-sla-submit</name> + <value> + instanceTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_nominalTime_echo_fixed, + user=org.apache.oozie.coord.CoordELFunctions#coord_user + </value> + <description> + EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD. + </description> + </property> + <property> + <name>oozie.service.ELService.ext.functions.coord-sla-create</name> + <value> + instanceTime=org.apache.oozie.coord.CoordELFunctions#ph2_coord_nominalTime, + user=org.apache.oozie.coord.CoordELFunctions#coord_user + </value> + <description> + EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD. + </description> + </property> + <property> + <name>oozie.service.coord.check.maximum.frequency</name> + <value>false</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties new file mode 100644 index 0000000..4207ab9 --- /dev/null +++ b/unit/src/main/resources/startup.properties @@ -0,0 +1,129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +*.domain=debug + +######### Implementation classes ######### +## DONT MODIFY UNLESS SURE ABOUT CHANGE ## + +*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine +*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder +*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder +*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager +*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService +*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager +*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService + +##### Falcon Services ##### +*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ + org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ + org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.entity.store.ConfigurationStore,\ + org.apache.falcon.rerun.service.RetryService,\ + org.apache.falcon.rerun.service.LateRunService,\ + +##### Falcon Configuration Store Change listeners ##### +*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ + org.apache.falcon.entity.ColoClusterRelation,\ + org.apache.falcon.group.FeedGroupMap,\ + org.apache.falcon.entity.store.FeedLocationStore + +##### JMS MQ Broker Implementation class ##### +*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory + + +######### System startup parameters ######### + +# Location to store user entity configurations +debug.config.store.uri=file://${user.dir}/target/store +debug.config.store.persist=false +debug.config.oozie.conf.uri=${user.dir}/target/oozie +debug.system.lib.location=${system.lib.location} +debug.broker.url=vm://localhost +debug.retry.recorder.path=${user.dir}/target/retry +debug.libext.feed.retention.paths=${falcon.libext} +debug.libext.feed.replication.paths=${falcon.libext} +debug.libext.process.paths=${falcon.libext} + +*.falcon.cleanup.service.frequency=minutes(5) + + +######### Properties for configuring JMS provider - activemq ######### +# Default Active MQ url +*.broker.url=tcp://localhost:61616 + +# default time-to-live for a JMS message 3 days (time in minutes) +*.broker.ttlInMins=4320 +*.entity.topic=FALCON.ENTITY.TOPIC +*.max.retry.failure.count=1 +*.retry.recorder.path=${user.dir}/logs/retry + +######### Properties for configuring iMon client and metric ######### +*.internal.queue.size=1000 + + +##### List of shared libraries for Falcon workflows ##### +*.shared.libs=activemq-core,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 + +######### Authentication Properties ######### + +# Authentication type must be specified: simple|kerberos +*.falcon.authentication.type=simple + +##### Service Configuration + +# Indicates the Kerberos principal to be used in Falcon Service. +*.falcon.service.authentication.kerberos.principal= + +# Location of the keytab file with the credentials for the Service principal. +*.falcon.service.authentication.kerberos.keytab= + +# name node principal to talk to config store +*.dfs.namenode.kerberos.principal= + +##### SPNEGO Configuration + +# Authentication type must be specified: simple|kerberos|<class> +# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility +*.falcon.http.authentication.type=simple + +# Indicates how long (in seconds) an authentication token is valid before it has to be renewed. +*.falcon.http.authentication.token.validity=36000 + +# The signature secret for signing the authentication tokens. +*.falcon.http.authentication.signature.secret=falcon + +# The domain to use for the HTTP cookie that stores the authentication token. +*.falcon.http.authentication.cookie.domain= + +# Indicates if anonymous requests are allowed when using 'simple' authentication. +*.falcon.http.authentication.simple.anonymous.allowed=false + +# Indicates the Kerberos principal to be used for HTTP endpoint. +# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. +*.falcon.http.authentication.kerberos.principal= + +# Location of the keytab file with the credentials for the HTTP principal. +*.falcon.http.authentication.kerberos.keytab= + +# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details. +*.falcon.http.authentication.kerberos.name.rules=DEFAULT + +# Comma separated list of black listed users +*.falcon.http.authentication.blacklisted.users= + http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java new file mode 100644 index 0000000..bd03efb --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.client.FalconCLIException; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.Storage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.InstancesResult; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.InputStream; +import java.io.BufferedReader; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Test Utility for Local Falcon Unit. + */ +public class FalconUnitTestBase { + + /** + * Perform a predicate evaluation. + * + * @return the boolean result of the evaluation. + * @throws Exception thrown if the predicate evaluation could not evaluate. + */ + public interface Predicate { + + boolean evaluate() throws Exception; + } + + private static final Logger LOG = LoggerFactory.getLogger(FalconUnitTestBase.class); + + private static final String DEFAULT_CLUSTER = "local"; + private static final String DEFAULT_COLO = "local"; + private static final String CLUSTER = "cluster"; + private static final String COLO = "colo"; + private static final String CLUSTER_TEMPLATE = "/cluster-template.xml"; + private static final String STAGING_PATH = "/projects/falcon/staging"; + private static final String WORKING_PATH = "/projects/falcon/working"; + + public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##"); + protected static FalconUnitClient falconUnitClient; + protected static JailedFileSystem fs; + protected static ConfigurationStore configStore; + + @BeforeClass + public void setup() throws FalconException, IOException { + FalconUnit.start(true); + falconUnitClient = FalconUnit.getClient(); + fs = (JailedFileSystem) FalconUnit.getFileSystem(); + configStore = falconUnitClient.getConfigStore(); + } + + @AfterClass + public void cleanup() throws Exception { + fs.delete(new Path(STAGING_PATH), true); + fs.delete(new Path(WORKING_PATH), true); + FalconUnit.cleanup(); + } + + @AfterTest + public void cleanUpActionXml() throws IOException { + //Needed since oozie writes action xml to current directory. + FileUtils.deleteQuietly(new File("action.xml")); + FileUtils.deleteQuietly(new File(".action.xml.crc")); + } + + protected FalconUnitClient getClient() throws FalconException { + return FalconUnit.getClient(); + } + + protected JailedFileSystem getFileSystem() throws IOException { + return fs; + } + + public boolean submitCluster(String colo, String cluster, + Map<String, String> props) throws IOException, FalconCLIException { + props = updateColoAndCluster(colo, cluster, props); + fs.mkdirs(new Path(STAGING_PATH), HadoopClientFactory.ALL_PERMISSION); + fs.mkdirs(new Path(WORKING_PATH), HadoopClientFactory.READ_EXECUTE_PERMISSION); + String clusterXmlPath = overlayParametersOverTemplate(CLUSTER_TEMPLATE, props); + APIResult result = falconUnitClient.submit(CLUSTER, clusterXmlPath); + return true ? APIResult.Status.SUCCEEDED.equals(result.getStatus()) : false; + } + + public boolean submitCluster() throws IOException, FalconCLIException { + return submitCluster(DEFAULT_COLO, DEFAULT_CLUSTER, null); + } + + public APIResult submit(EntityType entityType, String filePath) throws FalconCLIException, IOException { + return submit(entityType.toString(), filePath); + } + + public APIResult submit(String entityType, String filePath) throws FalconCLIException, IOException { + return falconUnitClient.submit(entityType, filePath); + } + + public APIResult submitProcess(String filePath, String appDirectory) throws IOException, FalconCLIException { + createDir(appDirectory); + return submit(EntityType.PROCESS, filePath); + } + + public APIResult scheduleProcess(String processName, String startTime, int numInstances, + String cluster, String localWfPath) throws FalconException, + IOException, FalconCLIException { + Process processEntity = configStore.get(EntityType.PROCESS, processName); + if (processEntity == null) { + throw new FalconException("Process not found " + processName); + } + String workflowPath = processEntity.getWorkflow().getPath(); + fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath)); + return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster); + } + + public APIResult scheduleProcess(String processName, String startTime, int numInstances, + String cluster) throws FalconException, FalconCLIException { + Process processEntity = configStore.get(EntityType.PROCESS, processName); + if (processEntity == null) { + throw new FalconException("Process not found " + processName); + } + return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster); + } + + private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String, String> props) { + if (props == null) { + props = new HashMap<>(); + } + String coloProp = StringUtils.isEmpty(colo) ? DEFAULT_COLO : colo; + props.put(COLO, coloProp); + + String clusterProp = StringUtils.isEmpty(cluster) ? DEFAULT_CLUSTER : cluster; + props.put(CLUSTER, clusterProp); + + return props; + } + + public static String overlayParametersOverTemplate(String template, + Map<String, String> overlay) throws IOException { + File tmpFile = getTempFile(); + OutputStream out = new FileOutputStream(tmpFile); + + InputStreamReader in; + InputStream resourceAsStream = FalconUnitTestBase.class.getResourceAsStream(template); + if (resourceAsStream == null) { + in = new FileReader(template); + } else { + in = new InputStreamReader(resourceAsStream); + } + BufferedReader reader = new BufferedReader(in); + String line; + while ((line = reader.readLine()) != null) { + Matcher matcher = VAR_PATTERN.matcher(line); + while (matcher.find()) { + String variable = line.substring(matcher.start(), matcher.end()); + line = line.replace(variable, overlay.get(variable.substring(2, variable.length() - 2))); + matcher = VAR_PATTERN.matcher(line); + } + out.write(line.getBytes()); + out.write("\n".getBytes()); + } + reader.close(); + out.close(); + return tmpFile.getAbsolutePath(); + } + + + public static File getTempFile() throws IOException { + return getTempFile("test", ".xml"); + } + + public static File getTempFile(String prefix, String suffix) throws IOException { + return getTempFile("target", prefix, suffix); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + public static File getTempFile(String path, String prefix, String suffix) throws IOException { + File f = new File(path); + if (!f.exists()) { + f.mkdirs(); + } + return File.createTempFile(prefix, suffix, f); + } + + /** + * Creates data in the feed path with the given timestamp. + * + * @param feedName + * @param cluster + * @param time + * @param inputFile + * @throws FalconException + * @throws ParseException + * @throws IOException + */ + public void createData(String feedName, String cluster, String time, + String inputFile) throws FalconException, ParseException, IOException { + String feedPath = getFeedPathForTS(cluster, feedName, time); + fs.mkdirs(new Path(feedPath)); + fs.copyFromLocalFile(new Path(getAbsolutePath("/" + inputFile)), new Path(feedPath)); + } + + protected String getFeedPathForTS(String cluster, String feedName, + String timeStamp) throws FalconException, ParseException { + Entity existingEntity = configStore.get(EntityType.FEED, feedName); + if (existingEntity == null) { + throw new FalconException("Feed Not Found " + feedName); + } + Feed feed = (Feed) existingEntity; + Storage rawStorage = FeedHelper.createStorage(cluster, feed); + String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA); + Properties properties = ExpressionHelper.getTimeVariables(ExpressionHelper.FORMATTER.get().parse(timeStamp), + TimeZone.getTimeZone("UTC")); + String feedPath = ExpressionHelper.substitute(feedPathTemplate, properties); + return feedPath; + } + + + public String getAbsolutePath(String fileName) { + return this.getClass().getResource(fileName).getPath(); + } + + public void createDir(String path) throws IOException { + fs.mkdirs(new Path(path)); + } + + /** + * Wait for a condition, expressed via a {@link Predicate} to become true. + * + * @param timeout maximum time in milliseconds to wait for the predicate to become true. + * @param predicate predicate waiting on. + * @return the waited time. + */ + protected long waitFor(int timeout, Predicate predicate) { + long started = System.currentTimeMillis(); + long mustEnd = System.currentTimeMillis() + timeout; + long lastEcho = 0; + try { + long waiting = mustEnd - System.currentTimeMillis(); + LOG.info("Waiting up to [{}] msec", waiting); + while (!(predicate.evaluate()) && System.currentTimeMillis() < mustEnd) { + if ((System.currentTimeMillis() - lastEcho) > 5000) { + waiting = mustEnd - System.currentTimeMillis(); + LOG.info("Waiting up to [{}] msec", waiting); + lastEcho = System.currentTimeMillis(); + } + Thread.sleep(5000); + } + if (!predicate.evaluate()) { + LOG.info("Waiting timed out after [{}] msec", timeout); + } + return System.currentTimeMillis() - started; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + protected long waitForStatus(final EntityType entityType, final String entityName, final String instanceTime) { + return waitFor(20000, new Predicate() { + public boolean evaluate() throws Exception { + InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType, + entityName, instanceTime); + return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status); + } + }); + } + + public void assertStatus(APIResult apiResult) { + Assert.assertEquals(APIResult.Status.SUCCEEDED, apiResult.getStatus()); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java new file mode 100644 index 0000000..855be79 --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.InstancesResult; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Test cases of falcon jobs using Local Oozie and LocalJobRunner. + */ +public class TestFalconUnit extends FalconUnitTestBase { + + @Test + public void testProcessInstanceExecution() throws Exception { + // submit with default props + submitCluster(); + // submitting feeds + APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml")); + assertStatus(result); + result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml")); + assertStatus(result); + // submitting and scheduling process + String scheduleTime = "2015-06-20T00:00Z"; + createData("in", "local", scheduleTime, "input.txt"); + result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr"); + assertStatus(result); + result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml")); + assertStatus(result); + waitForStatus(EntityType.PROCESS, "process", scheduleTime); + InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS, + "process", scheduleTime); + Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status); + String outPath = getFeedPathForTS("local", "out", scheduleTime); + Assert.assertTrue(getFileSystem().exists(new Path(outPath))); + FileStatus[] files = getFileSystem().listStatus(new Path(outPath)); + Assert.assertTrue(files.length > 0); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/java/org/apache/falcon/unit/examples/JavaExample.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/examples/JavaExample.java b/unit/src/test/java/org/apache/falcon/unit/examples/JavaExample.java new file mode 100644 index 0000000..95e320b --- /dev/null +++ b/unit/src/test/java/org/apache/falcon/unit/examples/JavaExample.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit.examples; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; + +/** + * Java Example for file copy. + */ +public final class JavaExample { + + private JavaExample() {} + + public static void main(String[] args) throws IOException { + System.out.println("Java Main Example"); + + if (args.length != 2) { + throw new IllegalArgumentException("No of arguments should be two"); + } + String inputPath = args[0]; + String outPath = args[1]; + FileSystem fs = FileSystem.get(new Configuration()); + fs.mkdirs(new Path(outPath)); + OutputStream out = fs.create(new Path(outPath + "/" + "part")); + FileStatus[] files = fs.listStatus(new Path(inputPath)); + if (files != null) { + for (FileStatus file : files) { + BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file.getPath()))); + String line; + while ((line = reader.readLine()) != null) { + if (!line.startsWith("#")) { + out.write(line.getBytes()); + out.write("\n".getBytes()); + System.out.println(line); + } + } + reader.close(); + } + } + out.close(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/resources/cluster-template.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/cluster-template.xml b/unit/src/test/resources/cluster-template.xml new file mode 100644 index 0000000..d0c9b24 --- /dev/null +++ b/unit/src/test/resources/cluster-template.xml @@ -0,0 +1,36 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<cluster colo="##colo##" description="" name="##cluster##" xmlns="uri:falcon:cluster:0.1"> + <interfaces> + <interface type="readonly" endpoint="jail://global:00" + version="0.20.2"/> + <interface type="write" endpoint="jail://global:00" + version="0.20.2"/> + <interface type="execute" endpoint="local" version="0.20.2"/> + <interface type="workflow" endpoint="localoozie" + version="3.1"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + </properties> +</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/resources/infeed.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/infeed.xml b/unit/src/test/resources/infeed.xml new file mode 100644 index 0000000..509d868 --- /dev/null +++ b/unit/src/test/resources/infeed.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="input" name="in" xmlns="uri:falcon:feed:0.1"> + <groups>inputs</groups> + + <frequency>minutes(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(1)"/> + + <clusters> + <cluster name="local"> + <validity start="2013-01-01T00:00Z" end="2030-01-01T00:00Z"/> + <retention limit="hours(400000)" action="delete"/> + </cluster> + </clusters> + + <locations> + <location type="data" path="/data/in/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> + </locations> + + <ACL owner="user" group="user" permission="0x644"/> + <schema location="/schema/log/log.format.csv" provider="csv"/> +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/resources/input.txt ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/input.txt b/unit/src/test/resources/input.txt new file mode 100644 index 0000000..cb1d8c0 --- /dev/null +++ b/unit/src/test/resources/input.txt @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +Hello Falcon Unit !!!!!! \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/resources/outfeed.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/outfeed.xml b/unit/src/test/resources/outfeed.xml new file mode 100644 index 0000000..017afbe --- /dev/null +++ b/unit/src/test/resources/outfeed.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<feed description="output" name="out" xmlns="uri:falcon:feed:0.1"> + <groups>outputs</groups> + + <frequency>minutes(1)</frequency> + <timezone>UTC</timezone> + <late-arrival cut-off="hours(1)"/> + + <clusters> + <cluster name="local"> + <validity start="2013-01-01T00:00Z" end="2030-01-01T00:00Z"/> + <retention limit="hours(2)" action="delete"/> + </cluster> + </clusters> + + <locations> + <location type="data" path="/data/out/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> + </locations> + + <ACL owner="user" group="user" permission="0x644"/> + <schema location="/schema/out/out.format.csv" provider="csv"/> +</feed> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/resources/process.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/process.xml b/unit/src/test/resources/process.xml new file mode 100644 index 0000000..6854311 --- /dev/null +++ b/unit/src/test/resources/process.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="process" xmlns="uri:falcon:process:0.1"> + <clusters> + <cluster name="local"> + <validity start="2013-11-18T00:05Z" end="2013-11-21T01:05Z"/> + </cluster> + </clusters> + + <parallel>5</parallel> + <order>FIFO</order> + <frequency>minutes(1)</frequency> + <timezone>UTC</timezone> + + <inputs> + <!-- In the workflow, the input paths will be available in a variable 'inpaths' --> + <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" /> + </inputs> + + <outputs> + <!-- In the workflow, the output path will be available in a variable 'outpath' --> + <output name="outpath" feed="out" instance="now(0,0)"/> + </outputs> + + <properties> + <!-- In the workflow, these properties will be available with variable - key --> + <property name="queueName" value="default"/> + <!-- The schedule time available as a property in workflow --> + <property name="time" value="${instanceTime()}"/> + </properties> + + <workflow engine="oozie" path="/app/oozie-mr"/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/unit/src/test/resources/workflow.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/workflow.xml b/unit/src/test/resources/workflow.xml new file mode 100644 index 0000000..8b4566c --- /dev/null +++ b/unit/src/test/resources/workflow.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf"> + <start to="java-node"/> + <action name="java-node"> + <java> + <job-tracker>local</job-tracker> + <name-node>jail://global:00</name-node> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>default</value> + </property> + </configuration> + <main-class>org.apache.falcon.unit.examples.JavaExample</main-class> + <arg>${inpaths}</arg> + <arg>${outpath}</arg> + </java> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index 47b51fe..c602ffb 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -42,7 +42,7 @@ import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; -import org.apache.oozie.client.ProxyOozieClient; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -292,7 +292,7 @@ public class EntityManagerJerseyIT { OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); List<BundleJob> bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); - ProxyOozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); + OozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); String bundle = bundles.get(0).getId(); String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId(); @@ -364,7 +364,7 @@ public class EntityManagerJerseyIT { OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); List<BundleJob> bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); - ProxyOozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); + OozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); String bundle = bundles.get(0).getId(); String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId(); http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java index 6e58064..ece4fbf 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java @@ -29,6 +29,7 @@ import org.apache.falcon.resource.InstancesResult.WorkflowStatus; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.OozieTestUtils; import org.apache.falcon.workflow.engine.OozieClientFactory; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.ProxyOozieClient; import org.apache.oozie.client.WorkflowJob; import org.testng.Assert; @@ -224,7 +225,7 @@ public class ProcessInstanceManagerIT { private void waitForWorkflow(String instance, WorkflowJob.Status status) throws Exception { TestContext context = new TestContext(); ExternalId extId = new ExternalId(context.processName, Tag.DEFAULT, EntityUtil.parseDateUTC(instance)); - ProxyOozieClient ozClient = OozieClientFactory.get( + OozieClient ozClient = OozieClientFactory.get( (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, context.clusterName)); String jobId = ozClient.getJobId(extId.getId()); WorkflowJob jobInfo = null; http://git-wip-us.apache.org/repos/asf/falcon/blob/3f00d051/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java index 02d1011..804b2ed 100644 --- a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java +++ b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java @@ -32,6 +32,7 @@ import org.apache.oozie.client.BundleJob; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.ProxyOozieClient; import org.apache.oozie.client.WorkflowJob; @@ -49,11 +50,11 @@ public final class OozieTestUtils { private OozieTestUtils() { } - public static ProxyOozieClient getOozieClient(TestContext context) throws FalconException { + public static OozieClient getOozieClient(TestContext context) throws FalconException { return getOozieClient(context.getCluster().getCluster()); } - public static ProxyOozieClient getOozieClient(Cluster cluster) throws FalconException { + public static OozieClient getOozieClient(Cluster cluster) throws FalconException { return OozieClientFactory.get(cluster); } @@ -63,7 +64,7 @@ public final class OozieTestUtils { return bundles; } - ProxyOozieClient ozClient = OozieClientFactory.get(context.getCluster().getCluster()); + OozieClient ozClient = OozieClientFactory.get(context.getCluster().getCluster()); return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + context.getProcessName(), 0, 10); } @@ -72,7 +73,7 @@ public final class OozieTestUtils { return true; } - ProxyOozieClient ozClient = getOozieClient(context); + OozieClient ozClient = getOozieClient(context); List<BundleJob> bundles = getBundles(context); if (bundles != null) { for (BundleJob bundle : bundles) { @@ -88,7 +89,7 @@ public final class OozieTestUtils { } public static void waitForInstanceToComplete(TestContext context, String jobId) throws Exception { - ProxyOozieClient ozClient = getOozieClient(context); + OozieClient ozClient = getOozieClient(context); String lastStatus = null; for (int i = 0; i < 50; i++) { WorkflowJob job = ozClient.getJobInfo(jobId); @@ -117,7 +118,7 @@ public final class OozieTestUtils { } private static List<WorkflowJob> getRunningJobs(TestContext context, String entityName) throws Exception { - ProxyOozieClient ozClient = getOozieClient(context); + OozieClient ozClient = getOozieClient(context); return ozClient.getJobsInfo( ProxyOozieClient.FILTER_STATUS + '=' + Job.Status.RUNNING + ';' + ProxyOozieClient.FILTER_NAME + '=' + "FALCON_PROCESS_DEFAULT_" + entityName); @@ -133,7 +134,7 @@ public final class OozieTestUtils { } public static void waitForBundleStart(TestContext context, String bundleId, Job.Status... status) throws Exception { - ProxyOozieClient ozClient = getOozieClient(context); + OozieClient ozClient = getOozieClient(context); Set<Job.Status> statuses = new HashSet<Job.Status>(Arrays.asList(status)); Status bundleStatus = null; @@ -162,7 +163,7 @@ public final class OozieTestUtils { } public static WorkflowJob getWorkflowJob(Cluster cluster, String filter) throws Exception { - ProxyOozieClient ozClient = getOozieClient(cluster); + OozieClient ozClient = getOozieClient(cluster); List<WorkflowJob> jobs; while (true) {
