Repository: kudu
Updated Branches:
  refs/heads/master 71c1aa680 -> d63fbc9b2


[examples] Add basic Spark example (scala)

This patch adds a basic Kudu-Spark example that utilizes the
Kudu-Spark integration.
It will allow users to pull down the pom.xml and scala source,
then build and execute from their local machine.

Change-Id: I9ba09f0118c054a07b951e241c31d66245c57d3f
Reviewed-on: http://gerrit.cloudera.org:8080/11788
Reviewed-by: Will Berkeley <wdberke...@gmail.com>
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <granthe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/086d8a02
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/086d8a02
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/086d8a02

Branch: refs/heads/master
Commit: 086d8a02a1d67c9e8d579df754ba4668cca4f649
Parents: 71c1aa6
Author: Mitch Barnett <mbarn...@cloudera.com>
Authored: Thu Oct 25 13:01:25 2018 -0500
Committer: Grant Henke <granthe...@apache.org>
Committed: Mon Nov 5 18:21:48 2018 +0000

----------------------------------------------------------------------
 examples/scala/spark-example/README.adoc        |  62 ++++++++++
 examples/scala/spark-example/pom.xml            | 118 +++++++++++++++++++
 .../kudu/spark/examples/SparkExample.scala      | 107 +++++++++++++++++
 3 files changed, 287 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/086d8a02/examples/scala/spark-example/README.adoc
----------------------------------------------------------------------
diff --git a/examples/scala/spark-example/README.adoc 
b/examples/scala/spark-example/README.adoc
new file mode 100644
index 0000000..8969325
--- /dev/null
+++ b/examples/scala/spark-example/README.adoc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+= Kudu-Spark example README
+:author: Kudu Team
+:homepage: https://kudu.apache.org/
+
+This is an example program that uses the Kudu-Spark integration to:
+
+- Create a table
+- Insert some rows
+- Upsert some rows
+- Scan some rows
+** Scan rows using RDD/DataFrame methods
+** Scan rows using SparkSQL
+- Delete the table
+
+To build the example, ensure maven is installed and execute
+the following from the 'spark-example' directory. This will create a Spark
+application jar in the 'target' directory:
+
+[source,bash]
+----
+$ mvn package
+----
+
+To configure the kudu-spark example, there are two Java system properties
+available:
+
+- kuduMasters: A comma-separated list of Kudu master addresses. 
+  Default: 'localhost:7051'.
+- tableName: The name of the table to use for the example program. This
+  table should not exist in Kudu. Default: 'spark_test'.
+
+The application can be run using `spark-submit`. For example, to run the
+example against a Spark cluster running on YARN, use a command like the
+following:
+
+[source.bash]
+----
+$ spark-submit --class org.apache.kudu.examples.SparkExample --master yarn \
+--driver-java-options \
+'-DkuduMasters=master1:7051,master2:7051,master3:7051 -DtableName=test_table' \
+target/kudu-spark-example-1.0-SNAPSHOT.jar
+----
+
+You will need the Kudu cluster to be up and running and Spark correctly
+configured for the example to work.

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d8a02/examples/scala/spark-example/pom.xml
----------------------------------------------------------------------
diff --git a/examples/scala/spark-example/pom.xml 
b/examples/scala/spark-example/pom.xml
new file mode 100644
index 0000000..ed310a7
--- /dev/null
+++ b/examples/scala/spark-example/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.kudu</groupId>
+  <artifactId>kudu-spark-example</artifactId>
+  <packaging>jar</packaging>
+  <version>1.0-SNAPSHOT</version>
+  <name>Kudu Spark Examples</name>
+
+
+  <build>
+    <plugins>
+      <!-- Add the scala plugin. -->
+      <plugin>
+          <groupId>net.alchim31.maven</groupId>
+          <artifactId>scala-maven-plugin</artifactId>
+          <version>3.3.2</version>
+          <executions>
+            <execution>
+              <id>scala-compile-first</id>
+              <phase>process-resources</phase>
+              <goals>
+                  <goal>add-source</goal>
+                  <goal>compile</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.6.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <!-- Shade the jar so it is directly executable. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>3.1.1</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <transformers>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                  <mainClass>org.apache.kudu.examples.SparkExample</mainClass>
+                </transformer>
+              </transformers>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+            </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-spark2_2.11</artifactId>
+      <version>1.7.1</version>
+    </dependency>
+
+    <!-- The Spark dependencies are provided. -->
+    <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-core_2.11</artifactId>
+        <version>2.3.2</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-sql_2.11</artifactId>
+        <version>2.3.2</version>
+    </dependency>
+
+    <!-- For logging messages. -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.25</version>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d8a02/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
 
b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
new file mode 100644
index 0000000..27b7fd5
--- /dev/null
+++ 
b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+
+package org.apache.kudu.spark.examples
+
+import collection.JavaConverters._
+
+import org.slf4j.LoggerFactory
+
+import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.Row
+
+object SparkExample {
+
+  val kuduMasters: String = System.getProperty("kuduMasters", 
"localhost:7051")   // Kudu master address list.
+  val tableName: String = System.getProperty("tableName", "spark_test") // 
Name of table you wish to create.
+  val nameCol = "name"
+  val idCol = "id"
+
+  val logger = LoggerFactory.getLogger(SparkExample.getClass)
+
+  // Define a class that we'll use to insert data into the table.
+  // Because we're defining a case class here, we circumvent the need to
+  // explicitly define a schema later in the code, like during our RDD -> 
toDF()
+  // calls later on.
+  case class User(name:String, id:Int)
+
+  def main(args: Array[String]) {
+    // Define our session and context variables for use throughout the program.
+    // The kuduContext is a serializable container for Kudu client connections,
+    // while the SparkSession is the entry point to SparkSQL and the 
Dataset/DataFrame API.
+    val spark = SparkSession.builder.appName("KuduSparkExample").getOrCreate()
+    val kuduContext = new KuduContext(kuduMasters, 
spark.sqlContext.sparkContext)
+
+    // Import a class from the SparkSession we instantiated above, to allow
+    // for easier RDD -> DF conversions.
+    import spark.implicits._
+
+    // The schema of the table we're going to create.
+    val schema = StructType(
+                       List(
+                         StructField(idCol, IntegerType, false),
+                         StructField(nameCol, StringType, false)
+                       )
+                  )
+
+    try {
+      // Create the table, with 3 partitions (tablets) and default number of 
replicas,
+      // as set by the Kudu service configuration.
+      if (!kuduContext.tableExists(tableName)) {
+        kuduContext.createTable(tableName, schema, Seq(idCol),
+          new CreateTableOptions().addHashPartitions(List(idCol).asJava, 3))
+      }
+
+      // Write to the table.
+      logger.info(s"Writing to table '$tableName'")
+      val data = Array(User("userA", 1234), User("userB", 5678))
+      val userRDD = spark.sparkContext.parallelize(data)
+      val userDF = userRDD.toDF()
+      kuduContext.insertRows(userDF, tableName)
+
+      // Read from the table using an RDD.
+      logger.info("Reading back the rows just written")
+      val readCols = Seq(nameCol, idCol)
+      val readRDD = kuduContext.kuduRDD(spark.sparkContext, tableName, 
readCols)
+      val userTuple = readRDD.map { case Row(name: String, id: Int) => (name, 
id) }
+      userTuple.collect().foreach(println(_))
+
+      // Upsert some rows.
+      logger.info(s"Upserting to table '$tableName'")
+      val upsertUsers = Array(User("newUserA", 1234), User("userC", 7777))
+      val upsertUsersRDD = spark.sparkContext.parallelize(upsertUsers)
+      val upsertUsersDF = upsertUsersRDD.toDF()
+      kuduContext.upsertRows(upsertUsersDF, tableName)
+
+      // Read the updated table using SparkSQL.
+      val sqlDF = spark.sqlContext.read.options(
+        Map("kudu.master" -> kuduMasters, "kudu.table" -> tableName)).kudu
+      sqlDF.createOrReplaceTempView(tableName)
+      spark.sqlContext.sql(s"select * from $tableName where $idCol > 
1000").show
+    }
+
+    finally {
+      // Clean up.
+      logger.info(s"Deleting table '$tableName' and closing down the session")
+      kuduContext.deleteTable(tableName)
+      spark.close()
+    }
+  }
+}

Reply via email to