IGNITE-4526: Add Spark Shared RDD examples Reviewed by Denis Magda <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b461cb47 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b461cb47 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b461cb47 Branch: refs/heads/ignite-comm-balance-master Commit: b461cb47882861356ede58775bd9e253dcf26202 Parents: 79e1e53 Author: Manish Mishra <[email protected]> Authored: Tue Feb 14 16:54:11 2017 -0800 Committer: Denis Magda <[email protected]> Committed: Tue Feb 14 16:54:11 2017 -0800 ---------------------------------------------------------------------- examples/config/spark/example-shared-rdd.xml | 83 ++++++++++++++ examples/pom.xml | 27 ++++- .../examples/java8/spark/SharedRDDExample.java | 110 +++++++++++++++++++ .../examples/spark/ScalarSharedRDDExample.scala | 89 +++++++++++++++ .../examples/SharedRDDExampleSelfTest.java | 36 ++++++ .../IgniteExamplesJ8SelfTestSuite.java | 3 + .../tests/examples/ScalarExamplesSelfTest.scala | 6 + .../apache/ignite/spark/JavaIgniteContext.scala | 6 + 8 files changed, 359 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/config/spark/example-shared-rdd.xml ---------------------------------------------------------------------- diff --git a/examples/config/spark/example-shared-rdd.xml b/examples/config/spark/example-shared-rdd.xml new file mode 100644 index 0000000..83de6a3 --- /dev/null +++ b/examples/config/spark/example-shared-rdd.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Ignite Spring configuration file to startup Ignite cache. + + This file demonstrates how to configure cache using Spring. Provided cache + will be created on node startup. + + When starting a standalone node, you need to execute the following command: + {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-shared-rdd.xml + + When starting Ignite from Java IDE, pass path to this file to Ignition: + Ignition.start("examples/config/example-shared-rdd.xml"); +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="cacheConfiguration"> + <!-- SharedRDD cache example configuration (Atomic mode). --> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <!-- Set a cache name. --> + <property name="name" value="sharedRDD"/> + <!-- Set a cache mode. --> + <property name="cacheMode" value="PARTITIONED"/> + <!-- Index Integer pairs used in the example. --> + <property name="indexedTypes"> + <list> + <value>java.lang.Integer</value> + <value>java.lang.Integer</value> + </list> + </property> + <!-- Set atomicity mode. --> + <property name="atomicityMode" value="ATOMIC"/> + <!-- Configure a number of backups. --> + <property name="backups" value="1"/> + </bean> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <!-- + Ignite provides several options for automatic discovery that can be used + instead os static IP based discovery. For information on all options refer + to our documentation: http://apacheignite.readme.io/docs/cluster-config + --> + <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> + <property name="addresses"> + <list> + <!-- In distributed environment, replace with actual host IP address. --> + <value>127.0.0.1:47500..47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 3a6a026..1c4ad25 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -17,7 +17,8 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -138,6 +139,18 @@ </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spark</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + <version>3.2.9.Final</version> + </dependency> </dependencies> <build> @@ -172,6 +185,18 @@ </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spark_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + <version>3.2.9.Final</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java new file mode 100644 index 0000000..392180d --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.spark; + +import org.apache.ignite.spark.JavaIgniteContext; +import org.apache.ignite.spark.JavaIgniteRDD; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.DataFrame; +import scala.Tuple2; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this + * particular example is to provide the simplest code example of this logic. + * <p> + * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node. + * <p> + * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's + * {@code standalone} property to {@code true} and running an Ignite node separately with + * `examples/config/spark/example-shared-rdd.xml` config. + */ +public class SharedRDDExample { + /** + * Executes the example. + * @param args Command line arguments, none required. + */ + public static void main(String args[]) { + // Spark Configuration. + SparkConf sparkConf = new SparkConf() + .setAppName("JavaIgniteRDDExample") + .setMaster("local") + .set("spark.executor.instances", "2"); + + // Spark context. + JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); + + // Adjust the logger to exclude the logs of no interest. + Logger.getRootLogger().setLevel(Level.ERROR); + Logger.getLogger("org.apache.ignite").setLevel(Level.INFO); + + // Creates Ignite context with specific configuration and runs Ignite in the embedded mode. + JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( + sparkContext,"examples/config/spark/example-shared-rdd.xml", false); + + // Create a Java Ignite RDD of Type (Int,Int) Integer Pair. + JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); + + // Define data to be stored in the Ignite RDD (cache). + List<Integer> data = IntStream.range(0, 20).boxed().collect(Collectors.toList()); + + // Preparing a Java RDD. + JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); + + // Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2. + sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() { + @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception { + return new Tuple2<Integer, Integer>(val, val); + } + })); + + System.out.println(">>> Iterating over Ignite Shared RDD..."); + + // Iterate over the Ignite RDD. + sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")")); + + System.out.println(">>> Transforming values stored in Ignite Shared RDD..."); + + // Filter out even values as a transformed RDD. + JavaPairRDD<Integer, Integer> transformedValues = + sharedRDD.filter((Tuple2<Integer, Integer> pair) -> pair._2() % 2 == 0); + + // Print out the transformed values. + transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")")); + + System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); + + // Execute SQL query over the Ignite RDD. + DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9"); + + // Show the result of the execution. + df.show(); + + // Close IgniteContext on all the workers. + igniteContext.close(true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala new file mode 100644 index 0000000..18662e8 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.scalar.examples.spark + +import org.apache.ignite.spark.{IgniteContext, IgniteRDD} +import org.apache.log4j.{Level, Logger} +import org.apache.spark.{SparkConf, SparkContext} + +/** + * This example demonstrates how to create an IgnitedRDD and share it with multiple spark workers. + * The goal of this particular example is to provide the simplest code example of this logic. + * <p> + * This example will start Ignite in the embedded mode and will start an IgniteContext on each Spark worker node. + * <p> + * The example can work in the standalone mode as well that can be enabled by setting IgniteContext's {@code isClient} + * property to {@code true} and running an Ignite node separately with `examples/config/spark/ + * example-shared-rdd.xml` config. + * <p> + */ +object ScalarSharedRDDExample extends App { + // Spark Configuration. + private val conf = new SparkConf() + .setAppName("IgniteRDDExample") + .setMaster("local") + .set("spark.executor.instances", "2") + + // Spark context. + val sparkContext = new SparkContext(conf) + + // Adjust the logger to exclude the logs of no interest. + Logger.getRootLogger.setLevel(Level.ERROR) + Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) + + // Defines spring cache Configuration path. + private val CONFIG = "examples/config/spark/example-shared-rdd.xml" + + // Creates Ignite context with above configuration. + val igniteContext = new IgniteContext(sparkContext, CONFIG, false) + + // Creates an Ignite Shared RDD of Type (Int,Int) Integer Pair. + val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD") + + // Fill the Ignite Shared RDD in with Int pairs. + sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i))) + + // Transforming Pairs to contain their Squared value. + sharedRDD.mapValues(x => (x * x)) + + // Retrieve sharedRDD back from the Cache. + val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache("sharedRDD") + + // Perform some transformations on IgniteRDD and print. + val squareAndRootPair = transformedValues.map { case (x, y) => (x, Math.sqrt(y.toDouble)) } + + println(">>> Transforming values stored in Ignite Shared RDD...") + + // Filter out pairs which square roots are less than 100 and + // take the first five elements from the transformed IgniteRDD and print them. + squareAndRootPair.filter(_._2 < 100.0).take(5).foreach(println) + + println(">>> Executing SQL query over Ignite Shared RDD...") + + // Execute a SQL query over the Ignite Shared RDD. + val df = transformedValues.sql("select _val from Integer where _val < 100 and _val > 9 ") + + // Show ten rows from the result set. + df.show(10) + + // Close IgniteContext on all workers. + igniteContext.close(true) + + // Stop SparkContext. + sparkContext.stop() +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java new file mode 100644 index 0000000..0fafb4d --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.java8.examples; + +import org.apache.ignite.examples.java8.spark.SharedRDDExample; +import org.junit.Test; + +/** + * SharedRDD examples self test. + */ +public class SharedRDDExampleSelfTest { + static final String[] EMPTY_ARGS = new String[0]; + /** + * @throws Exception If failed. + */ + @Test + public void testSharedRDDExample() throws Exception { + SharedRDDExample.main(EMPTY_ARGS); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java index 949324c..c32339f 100644 --- a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java +++ b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.java8.examples.EventsExamplesMultiNodeSelfTest; import org.apache.ignite.java8.examples.EventsExamplesSelfTest; import org.apache.ignite.java8.examples.IndexingBridgeMethodTest; import org.apache.ignite.java8.examples.MessagingExamplesSelfTest; +import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP; @@ -49,6 +50,8 @@ public class IgniteExamplesJ8SelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IndexingBridgeMethodTest.class)); suite.addTest(new TestSuite(CacheExamplesSelfTest.class)); suite.addTest(new TestSuite(BasicExamplesSelfTest.class)); + suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class)); + // suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class)); // suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class)); // suite.addTest(new TestSuite(DeploymentExamplesSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala index 94c41ad..28e509e 100644 --- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala +++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala @@ -18,6 +18,7 @@ package org.apache.ignite.scalar.tests.examples import org.apache.ignite.scalar.examples._ +import org.apache.ignite.scalar.examples.spark._ import org.apache.ignite.scalar.scalar import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest import org.scalatest.junit.JUnitSuiteLike @@ -95,4 +96,9 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik def testScalarSnowflakeSchemaExample() { ScalarSnowflakeSchemaExample.main(EMPTY_ARGS) } + + /** */ + def testScalarSharedRDDExample() { + ScalarSharedRDDExample.main(EMPTY_ARGS) + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala index 689a22d..d8a521b 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -51,6 +51,12 @@ class JavaIgniteContext[K, V]( }) } + def this(sc: JavaSparkContext, springUrl: String, standalone: Boolean) { + this(sc, new IgniteOutClosure[IgniteConfiguration] { + override def apply() = IgnitionEx.loadConfiguration(springUrl).get1() + }, standalone) + } + def fromCache(cacheName: String): JavaIgniteRDD[K, V] = JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))
