IGNITE-3710 Upgrade ignite-spark module to Spark 2.0 (cherry picked from commit 8613c16)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/caf7b225 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/caf7b225 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/caf7b225 Branch: refs/heads/master Commit: caf7b225c6ae55a530961a3b79ebde2368b6a24d Parents: 5b94a7d Author: Evgenii Zhuravlev <[email protected]> Authored: Mon Feb 20 19:24:42 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Feb 20 19:26:17 2017 +0300 ---------------------------------------------------------------------- .../examples/java8/spark/SharedRDDExample.java | 4 +- modules/spark-2.10/pom.xml | 54 ++++++ modules/spark/pom.xml | 183 ++++++++++++++++++- .../org/apache/ignite/spark/IgniteContext.scala | 22 ++- .../spark/JavaEmbeddedIgniteRDDSelfTest.java | 10 +- .../spark/JavaStandaloneIgniteRDDSelfTest.java | 22 +-- parent/pom.xml | 3 +- 7 files changed, 270 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java index 392180d..5f74a94 100644 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java @@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import scala.Tuple2; import java.util.List; @@ -99,7 +99,7 @@ public class SharedRDDExample { System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); // Execute SQL query over the Ignite RDD. - DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9"); + Dataset df = sharedRDD.sql("select _val from Integer where _key < 9"); // Show the result of the execution. df.show(); http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark-2.10/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark-2.10/pom.xml b/modules/spark-2.10/pom.xml index ed42227..58e2860 100644 --- a/modules/spark-2.10/pom.xml +++ b/modules/spark-2.10/pom.xml @@ -63,10 +63,64 @@ <dependency> <groupId>org.apache.spark</groupId> + <artifactId>spark-unsafe_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson2.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-shuffle_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson2.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.json4s</groupId> + <artifactId>json4s-core_2.11</artifactId> + <version>3.5.0</version> + </dependency> + <!-- Test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index d8cb894..140f637 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -52,7 +52,7 @@ <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> - <version>2.11.7</version> + <version>2.11.8</version> </dependency> <dependency> @@ -63,11 +63,53 @@ <dependency> <groupId>org.apache.spark</groupId> + <artifactId>spark-catalyst_2.11</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-common_2.11</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-shuffle_2.11</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson2.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson2.version}</version> + </dependency> + + <dependency> + <groupId>org.json4s</groupId> + <artifactId>json4s-core_2.11</artifactId> + <version>3.5.0</version> + </dependency> + + <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-indexing</artifactId> <version>${project.version}</version> @@ -89,7 +131,7 @@ <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.11</artifactId> - <version>2.2.4</version> + <version>2.2.6</version> <scope>test</scope> <exclusions> <exclusion> @@ -98,6 +140,143 @@ </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-unsafe_2.11</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-launcher_2.11</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_2.11</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-unsafe_2.10</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.5</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.0.29.Final</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.20</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill_2.11</artifactId> + <version>0.8.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.0.2</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-json</artifactId> + <version>3.1.2</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.tomcat</groupId> + <artifactId>tomcat-servlet-api</artifactId> + <version>8.0.23</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet-core</artifactId> + <version>2.25</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson2.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_2.11</artifactId> + <version>${jackson2.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.xbean</groupId> + <artifactId>xbean-asm5-shaded</artifactId> + <version>4.5</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + <version>1.3.0</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>1.9.0</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>3.0.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>3.0.0</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 04139d1..842c459 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -22,7 +22,8 @@ import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} import org.apache.ignite.internal.IgnitionEx import org.apache.ignite.internal.util.IgniteUtils import org.apache.spark.sql.SQLContext -import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.SparkContext +import org.apache.log4j.Logger /** * Ignite context. @@ -34,7 +35,7 @@ class IgniteContext( @transient val sparkContext: SparkContext, cfgF: () â IgniteConfiguration, standalone: Boolean = true - ) extends Serializable with Logging { + ) extends Serializable { private val cfgClo = new Once(cfgF) private val igniteHome = IgniteUtils.getIgniteHome @@ -47,7 +48,7 @@ class IgniteContext( if (workers <= 0) throw new IllegalStateException("No Spark executors found to start Ignite nodes.") - logInfo("Will start Ignite nodes on " + workers + " workers") + Logging.log.info("Will start Ignite nodes on " + workers + " workers") // Start ignite server node on each worker in server mode. sparkContext.parallelize(1 to workers, workers).foreachPartition(it â ignite()) @@ -126,7 +127,7 @@ class IgniteContext( val home = IgniteUtils.getIgniteHome if (home == null && igniteHome != null) { - logInfo("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome) + Logging.log.info("Setting IGNITE_HOME from driver not as it is not available on this worker: " + igniteHome) IgniteUtils.nullifyHomeDirectory() @@ -143,7 +144,7 @@ class IgniteContext( } catch { case e: IgniteException â - logError("Failed to start Ignite.", e) + Logging.log.error("Failed to start Ignite.", e) throw e } @@ -161,7 +162,7 @@ class IgniteContext( sparkContext.getExecutorStorageStatus.length) if (workers > 0) { - logInfo("Will stop Ignite nodes on " + workers + " workers") + Logging.log.info("Will stop Ignite nodes on " + workers + " workers") // Start ignite server node on each worker in server mode. sparkContext.parallelize(1 to workers, workers).foreachPartition(it â doClose()) @@ -200,3 +201,12 @@ private class Once(clo: () â IgniteConfiguration) extends Serializable { res } } + +/** + * Spark uses log4j by default. Using this logger in IgniteContext as well. + * + * This object is used to avoid problems with log4j serialization. + */ +object Logging extends Serializable { + @transient lazy val log = Logger.getLogger(classOf[IgniteContext]) +} http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java index 0c4d556..53aff75 100644 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java @@ -35,7 +35,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Column; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import scala.Tuple2; @@ -237,12 +237,12 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true); - DataFrame df = + Dataset<Row> df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); df.printSchema(); - Row[] res = df.collect(); + Row[] res = (Row[])df.collect(); assertEquals("Invalid result length", 1, res.length); assertEquals("Invalid result", 50, res[0].get(0)); @@ -251,11 +251,11 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); - DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); + Dataset<Row> df0 = cache.sql("select id, name, salary from Entity").where(exp); df.printSchema(); - Row[] res0 = df0.collect(); + Row[] res0 = (Row[])df0.collect(); assertEquals("Invalid result length", 1, res0.length); assertEquals("Invalid result", 50, res0[0].get(0)); http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java index e9d97a4..1075f96 100644 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java @@ -35,7 +35,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Column; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import scala.Tuple2; @@ -216,12 +216,12 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); - DataFrame df = + Dataset<Row> df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); df.printSchema(); - Row[] res = df.collect(); + Row[] res = (Row[])df.collect(); assertEquals("Invalid result length", 1, res.length); assertEquals("Invalid result", 50, res[0].get(0)); @@ -230,11 +230,11 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); - DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); + Dataset<Row> df0 = cache.sql("select id, name, salary from Entity").where(exp); df.printSchema(); - Row[] res0 = df0.collect(); + Row[] res0 = (Row[])df0.collect(); assertEquals("Invalid result length", 1, res0.length); assertEquals("Invalid result", 50, res0[0].get(0)); @@ -269,25 +269,25 @@ public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest { Object val = GridTestUtils.getFieldValue(e, fieldName); - DataFrame df = cache.sql( + Dataset<Row> df = cache.sql( String.format("select %s from EntityTestAllTypeFields where %s = ?", fieldName, fieldName), val); if (val instanceof BigDecimal) { - Object res = df.collect()[0].get(0); + Object res = ((Row[])df.collect())[0].get(0); assertTrue(String.format("+++ Fail on %s field", fieldName), ((Comparable<BigDecimal>)val).compareTo((BigDecimal)res) == 0); } else if (val instanceof java.sql.Date) assertEquals(String.format("+++ Fail on %s field", fieldName), - val.toString(), df.collect()[0].get(0).toString()); + val.toString(), ((Row[])df.collect())[0].get(0).toString()); else if (val.getClass().isArray()) assertTrue(String.format("+++ Fail on %s field", fieldName), 1 <= df.count()); else { - assertTrue(String.format("+++ Fail on %s field", fieldName), df.collect().length > 0); - assertTrue(String.format("+++ Fail on %s field", fieldName), df.collect()[0].size() > 0); - assertEquals(String.format("+++ Fail on %s field", fieldName), val, df.collect()[0].get(0)); + assertTrue(String.format("+++ Fail on %s field", fieldName), ((Row[])df.collect()).length > 0); + assertTrue(String.format("+++ Fail on %s field", fieldName), ((Row[])df.collect())[0].size() > 0); + assertEquals(String.format("+++ Fail on %s field", fieldName), val, ((Row[])df.collect())[0].get(0)); } info(String.format("+++ Query on the filed: %s : %s passed", fieldName, f.getType().getSimpleName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/caf7b225/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 3514435..a4972d1 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -36,7 +36,7 @@ <properties> <ignite.edition>fabric</ignite.edition> <hadoop.version>2.4.1</hadoop.version> - <spark.version>1.5.2</spark.version> + <spark.version>2.1.0</spark.version> <spring.version>4.1.0.RELEASE</spring.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format> @@ -99,7 +99,6 @@ <scala211.library.version>2.11.7</scala211.library.version> <slf4j.version>1.7.7</slf4j.version> <slf4j16.version>1.6.4</slf4j16.version> - <spark.version>1.5.2</spark.version> <spring.version>4.1.0.RELEASE</spring.version> <spring41.osgi.feature.version>4.1.7.RELEASE_1</spring41.osgi.feature.version> <tomcat.version>8.0.23</tomcat.version>
