http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/entity/spark/spark-process-pi.xml ---------------------------------------------------------------------- diff --git a/examples/entity/spark/spark-process-pi.xml b/examples/entity/spark/spark-process-pi.xml new file mode 100644 index 0000000..65c81cf --- /dev/null +++ b/examples/entity/spark/spark-process-pi.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="spark-pi" xmlns="uri:falcon:process:0.1"> + <clusters> + <cluster name="local"> + <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/> + </cluster> + </clusters> + + <parallel>1</parallel> + <order>LIFO</order> + <frequency>minutes(5)</frequency> + <timezone>UTC</timezone> + + <workflow engine="spark" path="/app/spark/"/> + <spark-attributes> + <master>local</master> + <name>Spark PI</name> + <class>org.apache.falcon.example.spark.SparkPI</class> + <jar>/app/spark/lib/falcon-examples.jar</jar> + <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts> + <arg>2</arg> + </spark-attributes> + + <retry policy="periodic" delay="minutes(3)" attempts="3"/> + +</process>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/entity/spark/spark-process.xml ---------------------------------------------------------------------- diff --git a/examples/entity/spark/spark-process.xml b/examples/entity/spark/spark-process.xml new file mode 100644 index 0000000..b9ecc98 --- /dev/null +++ b/examples/entity/spark/spark-process.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="spark-process" xmlns="uri:falcon:process:0.1"> + <clusters> + <cluster name="local"> + <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/> + </cluster> + </clusters> + + <parallel>1</parallel> + <order>LIFO</order> + <frequency>minutes(5)</frequency> + <timezone>UTC</timezone> + + <inputs> + <!-- In the workflow, the input paths will be available in a variable 'inpaths' --> + <input name="inpaths" feed="in" start="now(0,-5)" end="now(0,-1)"/> + </inputs> + + <outputs> + <!-- In the workflow, the output path will be available in a variable 'outpath' --> + <output name="outpath" feed="out" instance="now(0,0)"/> + </outputs> + + <workflow engine="spark" path="/app/spark"/> + <spark-attributes> + <master>local</master> + <name>Java Spark Wordcount</name> + <class>org.apache.falcon.example.spark.SparkWordCount</class> + <jar>/app/spark/lib/falcon-examples.jar</jar> + <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts> + </spark-attributes> + + <retry policy="periodic" delay="minutes(3)" attempts="3"/> + +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/entity/spark/spark-sql-process.xml ---------------------------------------------------------------------- diff --git a/examples/entity/spark/spark-sql-process.xml b/examples/entity/spark/spark-sql-process.xml new file mode 100644 index 0000000..cdd2ccc --- /dev/null +++ b/examples/entity/spark/spark-sql-process.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<process name="spark-sql-process" xmlns="uri:falcon:process:0.1"> + <!-- where --> + <clusters> + <cluster name="hcat-local"> + <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/> + </cluster> + </clusters> + + <!-- when --> + <parallel>1</parallel> + <order>LIFO</order> + <frequency>minutes(5)</frequency> + <timezone>UTC</timezone> + + <!-- what --> + <inputs> + <!-- In the workflow, the input paths will be available in a variable 'inpaths' --> + <input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/> + </inputs> + + <outputs> + <!-- In the workflow, the output path will be available in a variable 'outpath' --> + <output name="outpart" feed="hcat-out" instance="now(0,0)"/> + </outputs> + + <workflow engine="spark" path="/app/spark"/> + <spark-attributes> + <master>local</master> + <name>Spark SQL</name> + <class>org.apache.falcon.example.spark.SparkSQLProcessTable</class> + <jar>/app/spark/lib/falcon-examples.jar</jar> + <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts> + </spark-attributes> + + <retry policy="periodic" delay="minutes(3)" attempts="3"/> + +</process> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 2ff3011..a1aedf8 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -23,11 +23,12 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-examples</artifactId> <description>Apache Falcon Examples</description> <name>Apache Falcon Examples</name> + <packaging>jar</packaging> <dependencies> <dependency> @@ -35,6 +36,44 @@ <artifactId>hadoop-client</artifactId> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java new file mode 100644 index 0000000..7ae0b6b --- /dev/null +++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.example.spark; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; + +import java.util.ArrayList; +import java.util.List; + +/** + * Computes an approximation to pi. + * Usage: JavaSparkPi [slices] + */ +public final class SparkPI { + + private SparkPI() { + } + + public static void main(String[] args) throws Exception { + SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + + int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; + int n = 1 * slices; + System.out.println("n:"+n+"\tslices:"+slices); + List<Integer> l = new ArrayList<Integer>(n); + for (int i = 0; i < n; i++) { + l.add(i); + } + + JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); + + int count = dataSet.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer integer) { + double x = Math.random() * 2 - 1; + double y = Math.random() * 2 - 1; + return (x * x + y * y < 1) ? 1 : 0; + } + }).reduce(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer integer, Integer integer2) { + return integer + integer2; + } + }); + + System.out.println("Pi is roughly " + 4.0 * count / n); + + jsc.stop(); + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java new file mode 100644 index 0000000..5e9f092 --- /dev/null +++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.example.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.hive.HiveContext; + +/** + * Spark SQL Example. + */ + +public final class SparkSQLProcessTable { + + private SparkSQLProcessTable() { + } + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("Arguments must contain details for input or output table"); + System.exit(0); + } + + SparkConf conf = new SparkConf().setAppName("SparkSQL example"); + SparkContext sc = new SparkContext(conf); + HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); + + String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4] + +" PARTITION("+args[3]+") SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+" GROUP BY word"; + + DataFrame df = sqlContext.sql(sqlQuery); + df.show(); + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java new file mode 100644 index 0000000..f74a536 --- /dev/null +++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.example.spark; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +import java.util.Arrays; + +/** + * Spark Word Count example. + */ +public final class SparkWordCount { + + private SparkWordCount() { + } + protected static final FlatMapFunction<String, String> WORDS_EXTRACTOR = + new FlatMapFunction<String, String>() { + public Iterable<String> call(String s) throws Exception { + return Arrays.asList(s.split(" ")); + } + }; + + protected static final PairFunction<String, String, Integer> WORDS_MAPPER = + new PairFunction<String, String, Integer>() { + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }; + + protected static final Function2<Integer, Integer, Integer> WORDS_REDUCER = + new Function2<Integer, Integer, Integer>() { + public Integer call(Integer a, Integer b) throws Exception { + return a + b; + } + }; + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("Please provide the input file full path as argument"); + System.exit(0); + } + + SparkConf conf = new SparkConf().setAppName("Java WordCount"); + JavaSparkContext context = new JavaSparkContext(conf); + JavaRDD<String> file = context.textFile(args[0]); + JavaRDD<String> words = file.flatMap(WORDS_EXTRACTOR); + JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER); + JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER); + counter.saveAsTextFile(args[1]); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/pom.xml b/extensions/pom.xml index 4243dee..09d4249 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-extensions</artifactId> <description>Apache Falcon server side extensions Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java index 949aea5..9222e0a 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java @@ -39,6 +39,7 @@ public class HiveMirroringExtension extends AbstractExtension { private static final String ALL_TABLES = "*"; private static final String COMMA_DELIMITER = ","; private static final String SECURE_RESOURCE = "-secure"; + private static final String NOT_APPLICABLE = "NA"; @Override public String getName() { @@ -122,6 +123,12 @@ public class HiveMirroringExtension extends AbstractExtension { additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(), jobName); + // Get the first source DB + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_DATABASE.getName(), + extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_DATABASES + .getName()).trim().split(",")[0] + ); + String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()); // Add required properties of cluster where job should run additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(), @@ -210,7 +217,7 @@ public class HiveMirroringExtension extends AbstractExtension { String replicationMaxMaps = extensionProperties.getProperty(HiveMirroringExtensionProperties.MAX_MAPS.getName()); if (StringUtils.isBlank(replicationMaxMaps)) { - additionalProperties.put(HiveMirroringExtensionProperties.MAX_MAPS.getName(), "5"); + additionalProperties.put(HiveMirroringExtensionProperties.MAX_MAPS.getName(), "2"); } String distcpMaxMaps = extensionProperties.getProperty( @@ -230,6 +237,16 @@ public class HiveMirroringExtension extends AbstractExtension { additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); } + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName(), NOT_APPLICABLE); + } + + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName(), NOT_APPLICABLE); + } + return additionalProperties; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java index 6c4f58d..828817b 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java @@ -27,6 +27,7 @@ public enum HiveMirroringExtensionProperties { SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false), SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"), SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"), + SOURCE_DATABASE("sourceDatabase", "Database to verify the setup connection", false), SOURCE_TABLES("sourceTables", "List of tables to replicate", false), SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path", false), SOURCE_NN("sourceNN", "Source name node", false), @@ -50,13 +51,13 @@ public enum HiveMirroringExtensionProperties { MAX_EVENTS("maxEvents", "Maximum events to replicate", false), MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication", false), DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false), - MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"), + MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false), CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false), - CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("Job cluster kerberos principal", - "Write EP of cluster on which replication job runs", false), + CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", "Job cluster kerberos principal", + false), CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false), TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), - HIVE_MIRRORING_JOB_NAME("jobName", "Unique hive replication job name", false); + HIVE_MIRRORING_JOB_NAME("hiveJobName", "Unique hive replication job name", false); private final String name; private final String description; http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin-core/pom.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/pom.xml b/falcon-regression/merlin-core/pom.xml index fa3c939..367ef07 100644 --- a/falcon-regression/merlin-core/pom.xml +++ b/falcon-regression/merlin-core/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.falcon.regression</groupId> <artifactId>falcon-regression</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-merlin-core</artifactId> <description>merlin-core - utilities for Apache Falcon regression suite</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/pom.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/pom.xml b/falcon-regression/merlin/pom.xml index 73be13e..33012fc 100644 --- a/falcon-regression/merlin/pom.xml +++ b/falcon-regression/merlin/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.falcon.regression</groupId> <artifactId>falcon-regression</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-merlin</artifactId> <description>Merlin - Regression test suite for Apache Falcon</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java index ec2b877..6405b30 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java @@ -56,7 +56,7 @@ import java.util.List; * On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time. * Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement. */ -@Test(groups = { "distributed", "embedded", "sanity" }) +@Test(groups = { "distributed", "embedded", "sanity", "multiCluster" }) public class FeedLateRerunTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java index dad0dc2..3367817 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java @@ -60,7 +60,7 @@ import java.util.Map; * feed replication test. * Replicates empty directories as well as directories containing data. */ -@Test(groups = { "distributed", "embedded", "sanity" }) +@Test(groups = { "distributed", "embedded", "sanity", "multiCluster" }) public class FeedReplicationTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java index df1716f..dbb93eb 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java @@ -173,7 +173,7 @@ public class InstanceSummaryTest extends BaseTestClass { /** * Adjust multi-cluster process. Submit and schedule it. Get its instances summary. */ - @Test(enabled = true, timeOut = 1200000) + @Test(enabled = true, timeOut = 1200000, groups = "multiCluster") public void testSummaryMultiClusterProcess() throws JAXBException, ParseException, IOException, URISyntaxException, AuthenticationException, InterruptedException { @@ -208,7 +208,7 @@ public class InstanceSummaryTest extends BaseTestClass { /** * Adjust multi-cluster feed. Submit and schedule it. Get its instances summary. */ - @Test(enabled = true, timeOut = 1200000) + @Test(enabled = true, timeOut = 1200000, groups = "multiCluster") public void testSummaryMultiClusterFeed() throws JAXBException, ParseException, IOException, URISyntaxException, OozieClientException, AuthenticationException, InterruptedException { http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java index 20f8f46..ce4c903 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java @@ -48,7 +48,7 @@ import java.util.List; /** * Process instance mixed colo tests. */ -@Test(groups = { "distributed", "embedded" }) +@Test(groups = { "distributed", "embedded", "multiCluster" }) public class ProcessInstanceColoMixedTest extends BaseTestClass { private final String baseTestHDFSDir = cleanAndGetTestDir(); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java index eb20d7c..db020ec 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java @@ -119,7 +119,7 @@ public class HCatFeedOperationsTest extends BaseTestClass { * * @throws Exception */ - @Test(groups = {"singleCluster"}) + @Test(groups = {"multiCluster"}) public void submitFeedWhenTableDoesNotExist() throws Exception { Bundle.submitCluster(bundles[1]); feed = bundles[1].getInputFeedFromBundle(); @@ -159,7 +159,7 @@ public class HCatFeedOperationsTest extends BaseTestClass { * * @throws Exception */ - @Test + @Test(groups = {"multiCluster"}) public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception { Bundle.submitCluster(bundles[0], bundles[1]); final String startDate = "2010-01-01T20:00Z"; @@ -192,7 +192,7 @@ public class HCatFeedOperationsTest extends BaseTestClass { * * @throws Exception */ - @Test + @Test(groups = {"multiCluster"}) public void suspendAndResumeReplicationFeed() throws Exception { submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget(); @@ -215,7 +215,7 @@ public class HCatFeedOperationsTest extends BaseTestClass { * * @throws Exception */ - @Test + @Test(groups = {"multiCluster"}) public void deleteReplicationFeed() throws Exception { submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget(); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java index a96b17e..6643ce5 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java @@ -64,7 +64,7 @@ import java.util.Map; /** * Tests for replication with hcat. */ -@Test(groups = "embedded") +@Test(groups = {"embedded", "multiCluster"}) public class HCatReplicationTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(HCatReplicationTest.class); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java index 4a2d913..07996d5 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java @@ -50,7 +50,7 @@ import java.util.List; /** * Hdfs recipe test. */ -@Test(groups = "embedded") +@Test(groups = {"embedded", "multiCluster"}) public class HdfsRecipeTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(HdfsRecipeTest.class); private final ColoHelper cluster = servers.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java index 4dab0db..7cd71e1 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java @@ -65,7 +65,7 @@ import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanil /** * Hive DR Testing. */ -@Test(groups = "embedded") +@Test(groups = {"embedded", "multiCluster"}) public class HiveDRTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(HiveDRTest.class); private static final String DB_NAME = "hdr_sdb1"; http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java index 5efd69f..e281bee 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java @@ -59,7 +59,7 @@ import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanil /** * Hive DR Testing for Hive database replication. */ -@Test(groups = "embedded") +@Test(groups = {"embedded", "multiCluster"}) public class HiveDbDRTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(HiveDbDRTest.class); private final ColoHelper cluster = servers.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java index 849f67a..ac91d59 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java @@ -128,7 +128,7 @@ public class EntitySummaryTest extends BaseTestClass { * Get status of 7 feeds and 7 instances of each feed the call should give correct information, * instance info must be recent. */ - @Test + @Test(groups = "multiCluster") public void getFeedSummary() throws Exception { //prepare feed template. bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java index 4f86594..93efbac 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java @@ -59,7 +59,7 @@ import java.util.List; * expected instance statuses which are being compared with actual result of -list request * with different parameters in different order, variation, etc. */ -@Test(groups = { "distributed", "embedded", "sanity" }) +@Test(groups = { "distributed", "embedded", "sanity", "multiCluster"}) public class ListFeedInstancesTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(ListFeedInstancesTest.class); private OozieClient cluster2OC = serverOC.get(1); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java index 54e7805..81e0a7e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java @@ -184,7 +184,7 @@ public class NativeScheduleTest extends BaseTestClass { * Successfully schedule process via native scheduler through prism and server on multiple cluster. * Schedule the same process on oozie. It should fail. */ - @Test(groups = {"prism", "0.2"}) + @Test(groups = {"prism", "0.2", "multiCluster"}) public void scheduleProcessWithNativeOnTwoClusters() throws Exception { ProcessMerlin processMerlinNative = bundles[0].getProcessObject(); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java index 405725d..0a5e9ce 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java @@ -49,7 +49,7 @@ import java.io.IOException; /** * Update replication feed tests. */ -@Test(groups = { "distributed", "embedded" }) +@Test(groups = { "distributed", "embedded", "multiCluster" }) public class PrismFeedReplicationUpdateTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java index 137ef6f..1a1dc98 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java @@ -38,7 +38,7 @@ import java.io.IOException; /** * Schedule feed via prism tests. */ -@Test(groups = { "distributed", "embedded" }) +@Test(groups = { "distributed", "embedded", "multiCluster"}) public class PrismFeedScheduleTest extends BaseTestClass { private OozieClient cluster1OC = serverOC.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java index a5220e3..6caac9f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java @@ -96,7 +96,7 @@ public class PrismFeedUpdateTest extends BaseTestClass { * Set 2 processes with common output feed. Second one is zero-input process. Update feed * queue. TODO : complete test case */ - @Test(enabled = true, timeOut = 1200000) + @Test(enabled = true, timeOut = 1200000 , groups = "multiCluster") public void updateFeedQueueDependentMultipleProcessOneProcessZeroInput() throws Exception { //cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo, // feed02 on cluster2colo target cluster1colo http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java index 4aa7189..63f793f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java @@ -80,7 +80,7 @@ public class PrismProcessSnSTest extends BaseTestClass { * Submit and schedule process2 on cluster2. Check that process2 is running and process1 is * not running on cluster2. */ - @Test(groups = {"prism", "0.2", "embedded"}) + @Test(groups = {"prism", "0.2", "embedded", "multiCluster"}) public void testProcessSnSOnBothColos() throws Exception { //schedule both bundles bundles[0].submitAndScheduleProcess(); @@ -100,7 +100,7 @@ public class PrismProcessSnSTest extends BaseTestClass { * on cluster2. Submit process2 but schedule process1 once more. Check that process1 is running * on cluster1 but not on cluster2. */ - @Test(groups = {"prism", "0.2", "embedded"}) + @Test(groups = {"prism", "0.2", "embedded", "multiCluster"}) public void testProcessSnSForSubmittedProcessOnBothColos() throws Exception { //schedule both bundles bundles[0].submitProcess(true); @@ -122,7 +122,7 @@ public class PrismProcessSnSTest extends BaseTestClass { * once more and check that it is still running on cluster1 but process2 isn't running on * cluster2. */ - @Test(groups = {"prism", "0.2", "embedded"}) + @Test(groups = {"prism", "0.2", "embedded", "multiCluster"}) public void testProcessSnSForSubmittedProcessOnBothColosUsingColoHelper() throws Exception { bundles[0].submitProcess(true); @@ -228,7 +228,7 @@ public class PrismProcessSnSTest extends BaseTestClass { * running. Delete both of them. Submit and schedule them once more. Check that they are * running again. */ - @Test(groups = {"prism", "0.2", "embedded"}) + @Test(groups = {"prism", "0.2", "embedded", "multiCluster"}) public void testSnSDeletedProcessOnBothColos() throws Exception { //schedule both bundles final String cluster1Running = cluster1.getClusterHelper().getColoName() + "/RUNNING"; http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java index e99202b..a7887da 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java @@ -56,7 +56,7 @@ import java.sql.Connection; import java.util.Arrays; /** UI tests for Mirror Setup Wizard. */ -@Test(groups = "search-ui") +@Test(groups = {"search-ui", "multiCluster"}) public class MirrorTest extends BaseUITestClass { private static final Logger LOGGER = Logger.getLogger(MirrorTest.class); private final String baseTestDir = cleanAndGetTestDir(); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/pom.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/pom.xml b/falcon-regression/pom.xml index daa88cb..987ddaa 100644 --- a/falcon-regression/pom.xml +++ b/falcon-regression/pom.xml @@ -24,11 +24,11 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <groupId>org.apache.falcon.regression</groupId> <artifactId>falcon-regression</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> <description>Regression Framework for Falcon</description> <name>Apache Falcon Regression</name> <packaging>pom</packaging> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/css/img/user.svg ---------------------------------------------------------------------- diff --git a/falcon-ui/app/css/img/user.svg b/falcon-ui/app/css/img/user.svg index 60ac6c5..fb534c2 100644 --- a/falcon-ui/app/css/img/user.svg +++ b/falcon-ui/app/css/img/user.svg @@ -1,4 +1,20 @@ <?xml version="1.0" encoding="utf-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> <!-- Generator: Adobe Illustrator 16.0.3, SVG Export Plug-In . SVG Version: 6.00 Build 0) --> <!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> <svg version="1.1" id="User" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px" http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/css/styles/autocomplete-tags.less ---------------------------------------------------------------------- diff --git a/falcon-ui/app/css/styles/autocomplete-tags.less b/falcon-ui/app/css/styles/autocomplete-tags.less index c2f5dc2..4c6fc96 100644 --- a/falcon-ui/app/css/styles/autocomplete-tags.less +++ b/falcon-ui/app/css/styles/autocomplete-tags.less @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + .top-buffer{ padding-top:20px; } @@ -67,4 +85,4 @@ } .suggestions-list:focus{ outline:none; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/js/lib/popover.js ---------------------------------------------------------------------- diff --git a/falcon-ui/app/js/lib/popover.js b/falcon-ui/app/js/lib/popover.js index e26c870..57814df 100644 --- a/falcon-ui/app/js/lib/popover.js +++ b/falcon-ui/app/js/lib/popover.js @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + (function(window, angular, undefined){ 'use strict'; @@ -460,4 +478,4 @@ }; } ]); -})(window, window.angular); \ No newline at end of file +})(window, window.angular); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/test/e2e/protractor.js ---------------------------------------------------------------------- diff --git a/falcon-ui/app/test/e2e/protractor.js b/falcon-ui/app/test/e2e/protractor.js index 37d6e65..6c2d97e 100644 --- a/falcon-ui/app/test/e2e/protractor.js +++ b/falcon-ui/app/test/e2e/protractor.js @@ -1,7 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + exports.config = { chromeDriver: '../../../node_modules/protractor/selenium/chromedriver', jasmineNodeOpts: { showColors: true } -}; \ No newline at end of file +}; http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/karma.conf.js ---------------------------------------------------------------------- diff --git a/falcon-ui/karma.conf.js b/falcon-ui/karma.conf.js index 11189e8..776249f 100644 --- a/falcon-ui/karma.conf.js +++ b/falcon-ui/karma.conf.js @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // Karma configuration // Generated on Wed Sep 24 2014 21:15:41 GMT-0500 (CDT) http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/pom.xml ---------------------------------------------------------------------- diff --git a/falcon-ui/pom.xml b/falcon-ui/pom.xml index 93ed9bb..bbb917c 100644 --- a/falcon-ui/pom.xml +++ b/falcon-ui/pom.xml @@ -21,7 +21,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-ui</artifactId> <packaging>pom</packaging> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/hadoop-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-dependencies/pom.xml b/hadoop-dependencies/pom.xml index e2529f1..384cceb 100644 --- a/hadoop-dependencies/pom.xml +++ b/hadoop-dependencies/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-hadoop-dependencies</artifactId> <description>Apache Falcon Hadoop Dependencies Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/lifecycle/pom.xml ---------------------------------------------------------------------- diff --git a/lifecycle/pom.xml b/lifecycle/pom.xml index 725f1e6..b940796 100644 --- a/lifecycle/pom.xml +++ b/lifecycle/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-feed-lifecycle</artifactId> <description>Apache Falcon Lifecycle Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/lifecycle/src/main/resources/action/feed/eviction-action.xml ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml index 4ab67d2..bded1d6 100644 --- a/lifecycle/src/main/resources/action/feed/eviction-action.xml +++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml @@ -31,7 +31,7 @@ <!-- HCatalog jars --> <property> <name>oozie.action.sharelib.for.java</name> - <value>hcatalog</value> + <value>hcatalog,hive</value> </property> <property> <name>oozie.launcher.oozie.libpath</name> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/messaging/pom.xml ---------------------------------------------------------------------- diff --git a/messaging/pom.xml b/messaging/pom.xml index 667c5d1..3baddd9 100644 --- a/messaging/pom.xml +++ b/messaging/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-messaging</artifactId> <description>Apache Falcon JMS messaging Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 8b48e93..90bbdd3 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -92,8 +92,7 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic destination = topicSession.createTopic(topicName); - topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID, - WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false); + topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID); topicSubscriber.setMessageListener(this); connection.setExceptionListener(this); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java index 0ba9464..cffdb59 100644 --- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java +++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java @@ -83,11 +83,6 @@ public class JMSMessageConsumerTest { public void sendMessages(String topic, WorkflowExecutionContext.Type type) throws JMSException, FalconException, IOException { - sendMessages(topic, type, true); - } - - public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF) - throws JMSException, FalconException, IOException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); Connection connection = connectionFactory.createConnection(); connection.start(); @@ -105,10 +100,10 @@ public class JMSMessageConsumerTest { message = getMockFalconMessage(i, session); break; case WORKFLOW_JOB: - message = getMockOozieMessage(i, session, isFalconWF); + message = getMockOozieMessage(i, session); break; case COORDINATOR_ACTION: - message = getMockOozieCoordMessage(i, session, isFalconWF); + message = getMockOozieCoordMessage(i, session); default: break; } @@ -117,15 +112,10 @@ public class JMSMessageConsumerTest { } } - private Message getMockOozieMessage(int i, Session session, boolean isFalconWF) - throws FalconException, JMSException { + private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException { TextMessage message = session.createTextMessage(); message.setStringProperty("appType", "WORKFLOW_JOB"); - if (isFalconWF) { - message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); - } else { - message.setStringProperty("appName", "OozieSampleShellWF"); - } + message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); message.setStringProperty("user", "falcon"); switch(i % 4) { case 0: @@ -152,15 +142,11 @@ public class JMSMessageConsumerTest { return message; } - private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF) + private Message getMockOozieCoordMessage(int i, Session session) throws FalconException, JMSException { TextMessage message = session.createTextMessage(); message.setStringProperty("appType", "COORDINATOR_ACTION"); - if (isFalconWF) { - message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); - } else { - message.setStringProperty("appName", "OozieSampleShellWF"); - } + message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1"); message.setStringProperty("user", "falcon"); switch(i % 5) { case 0: @@ -245,15 +231,10 @@ public class JMSMessageConsumerTest { sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING); final BrokerView adminView = broker.getAdminView(); - - Assert.assertEquals(adminView.getTotalDequeueCount(), 0); -// Assert.assertEquals(adminView.getTotalEnqueueCount(), 10); Assert.assertEquals(adminView.getTotalConsumerCount(), 2); sendMessages(SECONDARY_TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING); -// Assert.assertEquals(adminView.getTotalEnqueueCount(), 18); - Assert.assertEquals(adminView.getTotalDequeueCount(), 0); Assert.assertEquals(adminView.getTotalConsumerCount(), 3); } catch (Exception e) { Assert.fail("This should not have thrown an exception.", e); @@ -265,9 +246,6 @@ public class JMSMessageConsumerTest { sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB); final BrokerView adminView = broker.getAdminView(); - - Assert.assertEquals(adminView.getTotalDequeueCount(), 0); -// Assert.assertEquals(adminView.getTotalEnqueueCount(), 10); Assert.assertEquals(adminView.getTotalConsumerCount(), 2); // Async operations. Give some time for messages to be processed. @@ -283,9 +261,6 @@ public class JMSMessageConsumerTest { sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.COORDINATOR_ACTION); final BrokerView adminView = broker.getAdminView(); - - Assert.assertEquals(adminView.getTotalDequeueCount(), 0); -// Assert.assertEquals(adminView.getTotalEnqueueCount(), 12); Assert.assertEquals(adminView.getTotalConsumerCount(), 2); // Async operations. Give some time for messages to be processed. @@ -303,24 +278,4 @@ public class JMSMessageConsumerTest { broker.stop(); subscriber.closeSubscriber(); } - - @Test - public void testJMSMessagesFromOozieForNonFalconWF() throws Exception { - sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF */); - - final BrokerView adminView = broker.getAdminView(); - - Assert.assertEquals(adminView.getTotalDequeueCount(), 0); - Assert.assertEquals(adminView.getTotalEnqueueCount(), 10); - Assert.assertEquals(adminView.getTotalConsumerCount(), 2); - Assert.assertEquals(adminView.getTotalMessageCount(), 0); - - Thread.sleep(100); - Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class)); - Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class)); - Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class)); - Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class)); - Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class)); - } - } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/metrics/pom.xml ---------------------------------------------------------------------- diff --git a/metrics/pom.xml b/metrics/pom.xml index 48cf80d..6266d0d 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-metrics</artifactId> <description>Apache Falcon Metrics</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie-el-extensions/pom.xml ---------------------------------------------------------------------- diff --git a/oozie-el-extensions/pom.xml b/oozie-el-extensions/pom.xml index 71ae45d..afbb2e3 100644 --- a/oozie-el-extensions/pom.xml +++ b/oozie-el-extensions/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-oozie-el-extension</artifactId> <description>Apache Falcon Oozie EL Extension</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java ---------------------------------------------------------------------- diff --git a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java index a6ff487..f0cb7cd 100644 --- a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java +++ b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java @@ -100,7 +100,7 @@ public final class OozieELExtensions { String emptyDir = (String) eval.getVariable(dataInName + ".empty-dir"); XLog.getLog(OozieELExtensions.class).debug("No instances could be resolved. Passing empty dir : " + emptyDir); - uristr = emptyDir; + return emptyDir; } } catch (Exception e) { throw new RuntimeException("Failed to resolve instance range for " + dataInName, e); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java ---------------------------------------------------------------------- diff --git a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java index b9bf594..2be8603 100644 --- a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java +++ b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java @@ -168,6 +168,8 @@ public class TestOozieELExtensions { "*/US", "_DONE", }, // With availability flag. All instances missing {"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE", "null", "_FINISH"}, + // With availability flag and partitions. All instances missing + {"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE", "*", "_FINISH"}, // No availability flag. One instance missing {"hdfs://localhost:8020/clicks/2009/09/02/09", "null", ""}, // With availability flag. One instance missing. http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index c83daf6..a784c5a 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -24,7 +24,7 @@ <parent> <groupId>org.apache.falcon</groupId> <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> + <version>0.10</version> </parent> <artifactId>falcon-oozie-adaptor</artifactId> <description>Apache Falcon Oozie Adaptor Module</description> http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index b0e46f0..07d293c 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -275,11 +275,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F Path scriptPath = new Path(buildPath, "scripts"); copyHiveScript(fs, scriptPath, IMPORT_HQL); copyHiveScript(fs, scriptPath, EXPORT_HQL); - - // create hive conf to stagingDir - Path confPath = new Path(buildPath + "/conf"); - persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-"); - persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-"); } catch (IOException e) { throw new FalconException("Unable to create hive conf files", e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java index 5a62130..010446b 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -24,7 +24,6 @@ import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.HiveUtil; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; @@ -79,11 +78,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW marshal(cluster, workflow, buildPath); Properties props = getProperties(buildPath, wfName); props.putAll(createDefaultConfiguration(cluster)); - if (EntityUtil.isTableStorageType(cluster, entity)) { - // todo: kludge send source hcat creds for coord dependency check to pass - props.putAll(HiveUtil.getHiveCredentials(srcCluster)); - props.putAll(HiveUtil.getHiveCredentials(cluster)); - } + props.putAll(getWorkflowProperties(entity)); props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); // Write out the config to config-default.xml http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java index 629485d..3da97d3 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java @@ -22,15 +22,18 @@ import org.apache.falcon.FalconException; import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.HiveUtil; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.workflow.WorkflowExecutionArgs; import javax.xml.bind.JAXBElement; import java.util.Arrays; +import java.util.Map; import java.util.Properties; /** @@ -60,6 +63,15 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild //Add pre-processing if (shouldPreProcess()) { ACTION action = getPreProcessingAction(false, Tag.REPLICATION); + Properties hiveConf = HiveUtil.getHiveCredentials(src); + for (Map.Entry<Object, Object> e : hiveConf.entrySet()) { + CONFIGURATION.Property prop = new CONFIGURATION.Property(); + prop.setName((String) e.getKey()); + prop.setValue((String) e.getValue()); + LOG.info("Adding config to replication hive preprocessing action : key = {} value = {}", + e.getKey(), e.getValue()); + action.getJava().getConfiguration().getProperty().add(prop); + } addHDFSServersConfig(action, src, target); addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(action); @@ -72,6 +84,16 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild OozieUtils.unMarshalHiveAction(export); org.apache.falcon.oozie.hive.ACTION hiveExportAction = exportActionJaxbElement.getValue(); addHDFSServersConfig(hiveExportAction, src, target); + Properties hiveConf = HiveUtil.getHiveCredentials(src); + for (Map.Entry<Object, Object> e : hiveConf.entrySet()) { + org.apache.falcon.oozie.hive.CONFIGURATION.Property prop = + new org.apache.falcon.oozie.hive.CONFIGURATION.Property(); + prop.setName((String) e.getKey()); + prop.setValue((String) e.getValue()); + LOG.info("Adding config to replication hive export action : key = {} value = {}", + e.getKey(), e.getValue()); + hiveExportAction.getConfiguration().getProperty().add(prop); + } OozieUtils.marshalHiveAction(export, exportActionJaxbElement); addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(export); @@ -89,6 +111,16 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild OozieUtils.unMarshalHiveAction(importAction); org.apache.falcon.oozie.hive.ACTION hiveImportAction = importActionJaxbElement.getValue(); addHDFSServersConfig(hiveImportAction, src, target); + Properties hiveConf2 = HiveUtil.getHiveCredentials(target); + for (Map.Entry<Object, Object> e : hiveConf2.entrySet()) { + org.apache.falcon.oozie.hive.CONFIGURATION.Property prop = + new org.apache.falcon.oozie.hive.CONFIGURATION.Property(); + prop.setName((String) e.getKey()); + prop.setValue((String) e.getValue()); + LOG.info("Adding config to replication hive import action : key = {} value = {}", + e.getKey(), e.getValue()); + hiveImportAction.getConfiguration().getProperty().add(prop); + } OozieUtils.marshalHiveAction(importAction, importActionJaxbElement); addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(importAction); @@ -133,8 +165,8 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild (org.apache.falcon.oozie.workflow.ACTION) object; String actionName = action.getName(); if (PREPROCESS_ACTION_NAME.equals(actionName)) { + // add reference to hive-site conf to each action - action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml"); if (isSecurityEnabled) { // add a reference to credential in the action action.setCred(SOURCE_HIVE_CREDENTIAL_NAME); http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java index dc5a491..51db75d 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java @@ -30,12 +30,10 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.spark.CONFIGURATION.Property; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.util.OozieUtils; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import javax.xml.bind.JAXBElement; @@ -46,6 +44,7 @@ import java.util.List; */ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder { private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml"; + private static final String FALCON_PREFIX = "falcon_"; public SparkProcessWorkflowBuilder(Process entity) { super(entity); @@ -58,7 +57,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue(); String sparkMasterURL = entity.getSparkAttributes().getMaster(); - String sparkFilePath = entity.getSparkAttributes().getJar(); + Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar()); String sparkJobName = entity.getSparkAttributes().getName(); String sparkOpts = entity.getSparkAttributes().getSparkOpts(); String sparkClassName = entity.getSparkAttributes().getClazz(); @@ -89,21 +88,32 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder argList.addAll(sparkArgs); } - addInputFeedsAsArgument(argList, cluster); + //Adding output first so that final order must have input and then output followed by user's arguments. addOutputFeedsAsArgument(argList, cluster); + addInputFeedsAsArgument(argList, cluster); - sparkAction.setJar(addUri(sparkFilePath, cluster)); - - setSparkLibFileToWorkflowLib(sparkFilePath, entity); + // In Oozie spark action, value for jar is either Java jar file path or Python file path. + validateSparkJarFilePath(sparkJarFilePath); + sparkAction.setJar(sparkJarFilePath.getName()); + setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity); propagateEntityProperties(sparkAction); OozieUtils.marshalSparkAction(action, actionJaxbElement); return action; } - private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) { + private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) { if (StringUtils.isEmpty(entity.getWorkflow().getLib())) { - entity.getWorkflow().setLib(sparkFile); + entity.getWorkflow().setLib(sparkJarFilePath); + } else { + String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath; + entity.getWorkflow().setLib(workflowLib); + } + } + + private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException { + if (!sparkJarFilePath.isAbsolute()) { + throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath); } } @@ -145,6 +155,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder return; } + //Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect. int numInputFeed = entity.getInputs().getInputs().size(); while (numInputFeed > 0) { Input input = entity.getInputs().getInputs().get(numInputFeed-1); @@ -153,6 +164,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder final String inputName = input.getName(); if (storage.getType() == Storage.TYPE.FILESYSTEM) { argList.add(0, "${" + inputName + "}"); + } else if (storage.getType() == Storage.TYPE.TABLE) { + argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}"); + argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}"); + argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}"); } numInputFeed--; } @@ -163,26 +178,24 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder return; } - for(Output output : entity.getOutputs().getOutputs()) { + //Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect. + int numOutputFeed = entity.getOutputs().getOutputs().size(); + while (numOutputFeed > 0) { + Output output = entity.getOutputs().getOutputs().get(numOutputFeed-1); Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed()); Storage storage = FeedHelper.createStorage(cluster, feed); final String outputName = output.getName(); if (storage.getType() == Storage.TYPE.FILESYSTEM) { - argList.add(argList.size(), "${" + outputName + "}"); + argList.add(0, "${" + outputName + "}"); + } else if (storage.getType() == Storage.TYPE.TABLE) { + argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}"); + argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}"); + argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}"); } + numOutputFeed--; } } - private String addUri(String jarFile, Cluster cluster) throws FalconException { - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - ClusterHelper.getConfiguration(cluster)); - Path jarFilePath = new Path(jarFile); - if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) { - return fs.makeQualified(jarFilePath).toString(); - } - return jarFile; - } - private String getClusterEntitySparkMaster(Cluster cluster) { return ClusterHelper.getSparkMasterEndPoint(cluster); } http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java index 4961896..ea914f6 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java @@ -48,6 +48,14 @@ public class FalconPostProcessing extends Configured implements Tool { // serialize the context to HDFS under logs dir before sending the message context.serialize(); + boolean systemNotificationEnabled = Boolean.parseBoolean(context. + getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true")); + + if (systemNotificationEnabled) { + LOG.info("Sending Falcon message {} ", context); + invokeFalconMessageProducer(context); + } + String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL); boolean userNotificationEnabled = Boolean.parseBoolean(context. getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true")); @@ -72,6 +80,13 @@ public class FalconPostProcessing extends Configured implements Tool { jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS); } + private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception { + JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context) + .type(JMSMessageProducer.MessageType.FALCON) + .build(); + jmsMessageProducer.sendMessage(); + } + private void invokeLogProducer(WorkflowExecutionContext context) { // todo: need to move this out to Falcon in-process if (UserGroupInformation.isSecurityEnabled()) {
