IGNITE-3710 Upgrade ignite-spark module to Spark 2.0 (cherry picked from commit b78d354)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3d7326a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3d7326a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3d7326a Branch: refs/heads/master Commit: e3d7326ac8eb848666c87dcaf17361860dd5b73d Parents: 94dab7b Author: Evgenii Zhuravlev <[email protected]> Authored: Wed Feb 22 16:16:32 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed Feb 22 16:17:36 2017 +0300 ---------------------------------------------------------------------- DEVNOTES.txt | 2 +- examples/pom.xml | 14 ++ .../examples/java8/spark/SharedRDDExample.java | 110 ---------------- .../ignite/examples/spark/SharedRDDExample.java | 127 +++++++++++++++++++ .../ignite/examples/spark/package-info.java | 22 ++++ .../examples/SharedRDDExampleSelfTest.java | 36 ------ .../IgniteExamplesJ8SelfTestSuite.java | 2 - .../examples/SharedRDDExampleSelfTest.java | 36 ++++++ .../IgniteExamplesSparkSelfTestSuite.java | 46 +++++++ 9 files changed, 246 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/DEVNOTES.txt ---------------------------------------------------------------------- diff --git a/DEVNOTES.txt b/DEVNOTES.txt index 6a275fc..79bedfd 100644 --- a/DEVNOTES.txt +++ b/DEVNOTES.txt @@ -33,7 +33,7 @@ Ignite Hadoop Accelerator Maven Build Instructions mvn clean package -DskipTests -Dignite.edition=hadoop [-Dhadoop.version=X.X.X] [-Dspark.version=x.y.z] Use 'hadoop.version' parameter to build Ignite against a specific Hadoop version. -Use 'spark.version' parameter to build ignite-spark module for a specific Spark version. +Use 'spark.version' parameter to build ignite-spark module for a specific Spark version. Version should be >= 2.0.0. Look for apache-ignite-hadoop-<version>-bin.zip in ./target/bin directory. Resulting binary assembly will also include integration module for Apache Spark. http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 1c4ad25..e7fd67d 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -112,6 +112,8 @@ <!-- will be changed by profile activation. allows to combine profiles. --> <lgpl.folder>src/main/java</lgpl.folder> <java8.folder>src/main/java</java8.folder> + <spark.folder>src/main/java</spark.folder> + <spark.test.folder>src/test/java</spark.test.folder> <lgpl.test.folder>src/test/java</lgpl.test.folder> <java8.test.folder>src/test/java</java8.test.folder> </properties> @@ -120,6 +122,11 @@ <profile> <id>scala</id> + <properties> + <spark.folder>src/main/spark</spark.folder> + <spark.test.folder>src/test/spark</spark.test.folder> + </properties> + <dependencies> <dependency> <groupId>org.apache.ignite</groupId> @@ -166,6 +173,11 @@ <profile> <id>scala-2.10</id> + <properties> + <spark.folder>src/main/spark</spark.folder> + <spark.test.folder>src/test/spark</spark.test.folder> + </properties> + <dependencies> <dependency> <groupId>org.apache.ignite</groupId> @@ -273,6 +285,7 @@ <source>schema-import/src/main/java</source> <source>${lgpl.folder}</source> <source>${java8.folder}</source> + <source>${spark.folder}</source> </sources> </configuration> </execution> @@ -286,6 +299,7 @@ <configuration> <sources> <source>${lgpl.test.folder}</source> + <source>${spark.test.folder}</source> <source>${java8.test.folder}</source> </sources> </configuration> http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java deleted file mode 100644 index 5f74a94..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.spark; - -import org.apache.ignite.spark.JavaIgniteContext; -import org.apache.ignite.spark.JavaIgniteRDD; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import scala.Tuple2; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -/** - * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this - * particular example is to provide the simplest code example of this logic. - * <p> - * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node. - * <p> - * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's - * {@code standalone} property to {@code true} and running an Ignite node separately with - * `examples/config/spark/example-shared-rdd.xml` config. - */ -public class SharedRDDExample { - /** - * Executes the example. - * @param args Command line arguments, none required. - */ - public static void main(String args[]) { - // Spark Configuration. - SparkConf sparkConf = new SparkConf() - .setAppName("JavaIgniteRDDExample") - .setMaster("local") - .set("spark.executor.instances", "2"); - - // Spark context. - JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); - - // Adjust the logger to exclude the logs of no interest. - Logger.getRootLogger().setLevel(Level.ERROR); - Logger.getLogger("org.apache.ignite").setLevel(Level.INFO); - - // Creates Ignite context with specific configuration and runs Ignite in the embedded mode. - JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( - sparkContext,"examples/config/spark/example-shared-rdd.xml", false); - - // Create a Java Ignite RDD of Type (Int,Int) Integer Pair. - JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); - - // Define data to be stored in the Ignite RDD (cache). - List<Integer> data = IntStream.range(0, 20).boxed().collect(Collectors.toList()); - - // Preparing a Java RDD. - JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); - - // Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2. - sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() { - @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception { - return new Tuple2<Integer, Integer>(val, val); - } - })); - - System.out.println(">>> Iterating over Ignite Shared RDD..."); - - // Iterate over the Ignite RDD. - sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")")); - - System.out.println(">>> Transforming values stored in Ignite Shared RDD..."); - - // Filter out even values as a transformed RDD. - JavaPairRDD<Integer, Integer> transformedValues = - sharedRDD.filter((Tuple2<Integer, Integer> pair) -> pair._2() % 2 == 0); - - // Print out the transformed values. - transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")")); - - System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); - - // Execute SQL query over the Ignite RDD. - Dataset df = sharedRDD.sql("select _val from Integer where _key < 9"); - - // Show the result of the execution. - df.show(); - - // Close IgniteContext on all the workers. - igniteContext.close(true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/main/spark/org/apache/ignite/examples/spark/SharedRDDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/SharedRDDExample.java b/examples/src/main/spark/org/apache/ignite/examples/spark/SharedRDDExample.java new file mode 100644 index 0000000..d8e5f16 --- /dev/null +++ b/examples/src/main/spark/org/apache/ignite/examples/spark/SharedRDDExample.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.spark; + +import org.apache.ignite.spark.JavaIgniteContext; +import org.apache.ignite.spark.JavaIgniteRDD; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.sql.Dataset; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +/** + * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this + * particular example is to provide the simplest code example of this logic. + * <p> + * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node. + * <p> + * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's + * {@code standalone} property to {@code true} and running an Ignite node separately with + * `examples/config/spark/example-shared-rdd.xml` config. + */ +public class SharedRDDExample { + /** + * Executes the example. + * @param args Command line arguments, none required. + */ + public static void main(String args[]) { + // Spark Configuration. + SparkConf sparkConf = new SparkConf() + .setAppName("JavaIgniteRDDExample") + .setMaster("local") + .set("spark.executor.instances", "2"); + + // Spark context. + JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); + + // Adjust the logger to exclude the logs of no interest. + Logger.getRootLogger().setLevel(Level.ERROR); + Logger.getLogger("org.apache.ignite").setLevel(Level.INFO); + + // Creates Ignite context with specific configuration and runs Ignite in the embedded mode. + JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( + sparkContext,"examples/config/spark/example-shared-rdd.xml", false); + + // Create a Java Ignite RDD of Type (Int,Int) Integer Pair. + JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); + + // Define data to be stored in the Ignite RDD (cache). + List<Integer> data = new ArrayList<>(20); + + for (int i = 0; i<20; i++) { + data.add(i); + } + + // Preparing a Java RDD. + JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); + + // Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2. + sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() { + @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception { + return new Tuple2<Integer, Integer>(val, val); + } + })); + + System.out.println(">>> Iterating over Ignite Shared RDD..."); + + // Iterate over the Ignite RDD. + sharedRDD.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { + @Override public void call(Tuple2<Integer, Integer> tuple) throws Exception { + System.out.println("(" + tuple._1 + "," + tuple._2 + ")"); + } + }); + + System.out.println(">>> Transforming values stored in Ignite Shared RDD..."); + + // Filter out even values as a transformed RDD. + JavaPairRDD<Integer, Integer> transformedValues = + sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() { + @Override public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception { + return tuple._2() % 2 == 0; + } + }); + + // Print out the transformed values. + transformedValues.foreach(new VoidFunction<Tuple2<Integer, Integer>>() { + @Override public void call(Tuple2<Integer, Integer> tuple) throws Exception { + System.out.println("(" + tuple._1 + "," + tuple._2 + ")"); + } + }); + + System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); + + // Execute SQL query over the Ignite RDD. + Dataset df = sharedRDD.sql("select _val from Integer where _key < 9"); + + // Show the result of the execution. + df.show(); + + // Close IgniteContext on all the workers. + igniteContext.close(true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/main/spark/org/apache/ignite/examples/spark/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/package-info.java b/examples/src/main/spark/org/apache/ignite/examples/spark/package-info.java new file mode 100644 index 0000000..f97a7b7 --- /dev/null +++ b/examples/src/main/spark/org/apache/ignite/examples/spark/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Basic examples for ignite functionality with spark. + */ +package org.apache.ignite.examples.spark; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java deleted file mode 100644 index 0fafb4d..0000000 --- a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.java8.examples; - -import org.apache.ignite.examples.java8.spark.SharedRDDExample; -import org.junit.Test; - -/** - * SharedRDD examples self test. - */ -public class SharedRDDExampleSelfTest { - static final String[] EMPTY_ARGS = new String[0]; - /** - * @throws Exception If failed. - */ - @Test - public void testSharedRDDExample() throws Exception { - SharedRDDExample.main(EMPTY_ARGS); - } - -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java index c32339f..7b62ea8 100644 --- a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java +++ b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java @@ -26,7 +26,6 @@ import org.apache.ignite.java8.examples.EventsExamplesMultiNodeSelfTest; import org.apache.ignite.java8.examples.EventsExamplesSelfTest; import org.apache.ignite.java8.examples.IndexingBridgeMethodTest; import org.apache.ignite.java8.examples.MessagingExamplesSelfTest; -import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP; @@ -50,7 +49,6 @@ public class IgniteExamplesJ8SelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IndexingBridgeMethodTest.class)); suite.addTest(new TestSuite(CacheExamplesSelfTest.class)); suite.addTest(new TestSuite(BasicExamplesSelfTest.class)); - suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class)); // suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class)); // suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/test/spark/org/apache/ignite/spark/examples/SharedRDDExampleSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/spark/org/apache/ignite/spark/examples/SharedRDDExampleSelfTest.java b/examples/src/test/spark/org/apache/ignite/spark/examples/SharedRDDExampleSelfTest.java new file mode 100644 index 0000000..038202e --- /dev/null +++ b/examples/src/test/spark/org/apache/ignite/spark/examples/SharedRDDExampleSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.examples; + +import org.apache.ignite.examples.spark.SharedRDDExample; +import org.junit.Test; + +/** + * SharedRDD examples self test. + */ +public class SharedRDDExampleSelfTest { + static final String[] EMPTY_ARGS = new String[0]; + /** + * @throws Exception If failed. + */ + @Test + public void testSharedRDDExample() throws Exception { + SharedRDDExample.main(EMPTY_ARGS); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e3d7326a/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java b/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java new file mode 100644 index 0000000..73b286a --- /dev/null +++ b/examples/src/test/spark/org/apache/ignite/spark/testsuites/IgniteExamplesSparkSelfTestSuite.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.spark.examples.SharedRDDExampleSelfTest; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP; + +/** + * Examples test suite. + * <p> + * Contains only Spring ignite examples tests. + */ +public class IgniteExamplesSparkSelfTestSuite extends TestSuite { + /** + * @return Suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + System.setProperty(IGNITE_OVERRIDE_MCAST_GRP, + GridTestUtils.getNextMulticastGroup(IgniteExamplesSparkSelfTestSuite.class)); + + TestSuite suite = new TestSuite("Ignite Examples Test Suite"); + + suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class)); + + return suite; + } +}
