Repository: kudu Updated Branches: refs/heads/master 71c1aa680 -> d63fbc9b2
[examples] Add basic Spark example (scala) This patch adds a basic Kudu-Spark example that utilizes the Kudu-Spark integration. It will allow users to pull down the pom.xml and scala source, then build and execute from their local machine. Change-Id: I9ba09f0118c054a07b951e241c31d66245c57d3f Reviewed-on: http://gerrit.cloudera.org:8080/11788 Reviewed-by: Will Berkeley <wdberke...@gmail.com> Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <granthe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/086d8a02 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/086d8a02 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/086d8a02 Branch: refs/heads/master Commit: 086d8a02a1d67c9e8d579df754ba4668cca4f649 Parents: 71c1aa6 Author: Mitch Barnett <mbarn...@cloudera.com> Authored: Thu Oct 25 13:01:25 2018 -0500 Committer: Grant Henke <granthe...@apache.org> Committed: Mon Nov 5 18:21:48 2018 +0000 ---------------------------------------------------------------------- examples/scala/spark-example/README.adoc | 62 ++++++++++ examples/scala/spark-example/pom.xml | 118 +++++++++++++++++++ .../kudu/spark/examples/SparkExample.scala | 107 +++++++++++++++++ 3 files changed, 287 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/086d8a02/examples/scala/spark-example/README.adoc ---------------------------------------------------------------------- diff --git a/examples/scala/spark-example/README.adoc b/examples/scala/spark-example/README.adoc new file mode 100644 index 0000000..8969325 --- /dev/null +++ b/examples/scala/spark-example/README.adoc @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + += Kudu-Spark example README +:author: Kudu Team +:homepage: https://kudu.apache.org/ + +This is an example program that uses the Kudu-Spark integration to: + +- Create a table +- Insert some rows +- Upsert some rows +- Scan some rows +** Scan rows using RDD/DataFrame methods +** Scan rows using SparkSQL +- Delete the table + +To build the example, ensure maven is installed and execute +the following from the 'spark-example' directory. This will create a Spark +application jar in the 'target' directory: + +[source,bash] +---- +$ mvn package +---- + +To configure the kudu-spark example, there are two Java system properties +available: + +- kuduMasters: A comma-separated list of Kudu master addresses. + Default: 'localhost:7051'. +- tableName: The name of the table to use for the example program. This + table should not exist in Kudu. Default: 'spark_test'. + +The application can be run using `spark-submit`. For example, to run the +example against a Spark cluster running on YARN, use a command like the +following: + +[source.bash] +---- +$ spark-submit --class org.apache.kudu.examples.SparkExample --master yarn \ +--driver-java-options \ +'-DkuduMasters=master1:7051,master2:7051,master3:7051 -DtableName=test_table' \ +target/kudu-spark-example-1.0-SNAPSHOT.jar +---- + +You will need the Kudu cluster to be up and running and Spark correctly +configured for the example to work. http://git-wip-us.apache.org/repos/asf/kudu/blob/086d8a02/examples/scala/spark-example/pom.xml ---------------------------------------------------------------------- diff --git a/examples/scala/spark-example/pom.xml b/examples/scala/spark-example/pom.xml new file mode 100644 index 0000000..ed310a7 --- /dev/null +++ b/examples/scala/spark-example/pom.xml @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-spark-example</artifactId> + <packaging>jar</packaging> + <version>1.0-SNAPSHOT</version> + <name>Kudu Spark Examples</name> + + + <build> + <plugins> + <!-- Add the scala plugin. --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.3.2</version> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.2</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <!-- Shade the jar so it is directly executable. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.1.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.kudu.examples.SparkExample</mainClass> + </transformer> + </transformers> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-spark2_2.11</artifactId> + <version>1.7.1</version> + </dependency> + + <!-- The Spark dependencies are provided. --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>2.3.2</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.11</artifactId> + <version>2.3.2</version> + </dependency> + + <!-- For logging messages. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.25</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/kudu/blob/086d8a02/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala ---------------------------------------------------------------------- diff --git a/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala new file mode 100644 index 0000000..27b7fd5 --- /dev/null +++ b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations + +package org.apache.kudu.spark.examples + +import collection.JavaConverters._ + +import org.slf4j.LoggerFactory + +import org.apache.kudu.client._ +import org.apache.kudu.spark.kudu._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.Row + +object SparkExample { + + val kuduMasters: String = System.getProperty("kuduMasters", "localhost:7051") // Kudu master address list. + val tableName: String = System.getProperty("tableName", "spark_test") // Name of table you wish to create. + val nameCol = "name" + val idCol = "id" + + val logger = LoggerFactory.getLogger(SparkExample.getClass) + + // Define a class that we'll use to insert data into the table. + // Because we're defining a case class here, we circumvent the need to + // explicitly define a schema later in the code, like during our RDD -> toDF() + // calls later on. + case class User(name:String, id:Int) + + def main(args: Array[String]) { + // Define our session and context variables for use throughout the program. + // The kuduContext is a serializable container for Kudu client connections, + // while the SparkSession is the entry point to SparkSQL and the Dataset/DataFrame API. + val spark = SparkSession.builder.appName("KuduSparkExample").getOrCreate() + val kuduContext = new KuduContext(kuduMasters, spark.sqlContext.sparkContext) + + // Import a class from the SparkSession we instantiated above, to allow + // for easier RDD -> DF conversions. + import spark.implicits._ + + // The schema of the table we're going to create. + val schema = StructType( + List( + StructField(idCol, IntegerType, false), + StructField(nameCol, StringType, false) + ) + ) + + try { + // Create the table, with 3 partitions (tablets) and default number of replicas, + // as set by the Kudu service configuration. + if (!kuduContext.tableExists(tableName)) { + kuduContext.createTable(tableName, schema, Seq(idCol), + new CreateTableOptions().addHashPartitions(List(idCol).asJava, 3)) + } + + // Write to the table. + logger.info(s"Writing to table '$tableName'") + val data = Array(User("userA", 1234), User("userB", 5678)) + val userRDD = spark.sparkContext.parallelize(data) + val userDF = userRDD.toDF() + kuduContext.insertRows(userDF, tableName) + + // Read from the table using an RDD. + logger.info("Reading back the rows just written") + val readCols = Seq(nameCol, idCol) + val readRDD = kuduContext.kuduRDD(spark.sparkContext, tableName, readCols) + val userTuple = readRDD.map { case Row(name: String, id: Int) => (name, id) } + userTuple.collect().foreach(println(_)) + + // Upsert some rows. + logger.info(s"Upserting to table '$tableName'") + val upsertUsers = Array(User("newUserA", 1234), User("userC", 7777)) + val upsertUsersRDD = spark.sparkContext.parallelize(upsertUsers) + val upsertUsersDF = upsertUsersRDD.toDF() + kuduContext.upsertRows(upsertUsersDF, tableName) + + // Read the updated table using SparkSQL. + val sqlDF = spark.sqlContext.read.options( + Map("kudu.master" -> kuduMasters, "kudu.table" -> tableName)).kudu + sqlDF.createOrReplaceTempView(tableName) + spark.sqlContext.sql(s"select * from $tableName where $idCol > 1000").show + } + + finally { + // Clean up. + logger.info(s"Deleting table '$tableName' and closing down the session") + kuduContext.deleteTable(tableName) + spark.close() + } + } +}