Repository: kudu Updated Branches: refs/heads/master 9130bb0e1 -> 8020cbf27
Supporting Spark streaming DataFrame in KuduContext. KUDU-2539: Supporting Spark streaming DataFrame in KuduContext. This solution follows the way how other sinks ie. KafkaSink is implemented, for details see https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala#L87 Where on the DataFrame a queryExecution.toRdd.foreachPartition is called to access the InternalRows which mapped to Rows by Catalyst converters. Change-Id: Iead04539d3514920a5d6803c34715e5686124572 Reviewed-on: http://gerrit.cloudera.org:8080/11199 Tested-by: Kudu Jenkins Reviewed-by: Attila Bukor <[email protected]> Reviewed-by: Grant Henke <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8020cbf2 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8020cbf2 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8020cbf2 Branch: refs/heads/master Commit: 8020cbf2760483c46ed0766dfdebe3c12d0107f1 Parents: 9130bb0 Author: attilapiros <[email protected]> Authored: Fri Aug 10 07:49:16 2018 -0700 Committer: Grant Henke <[email protected]> Committed: Wed Sep 26 15:45:21 2018 +0000 ---------------------------------------------------------------------- java/gradle/dependencies.gradle | 2 + java/kudu-spark/build.gradle | 4 +- java/kudu-spark/pom.xml | 17 +++- .../apache/kudu/spark/kudu/KuduContext.scala | 12 ++- .../apache/kudu/spark/kudu/StreamingTest.scala | 97 ++++++++++++++++++++ 5 files changed, 126 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle index 5384599..252769a 100755 --- a/java/gradle/dependencies.gradle +++ b/java/gradle/dependencies.gradle @@ -98,6 +98,7 @@ libs += [ protobufJavaUtil : "com.google.protobuf:protobuf-java-util:$versions.protobuf", protoc : "com.google.protobuf:protoc:$versions.protobuf", scalaLibrary : "org.scala-lang:scala-library:$versions.scala", + scalap : "org.scala-lang:scalap:$versions.scala", scalatest : "org.scalatest:scalatest_$versions.scalaBase:$versions.scalatest", scopt : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt", slf4jApi : "org.slf4j:slf4j-api:$versions.slf4j", @@ -105,5 +106,6 @@ libs += [ sparkAvro : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro", sparkCore : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark", sparkSql : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark", + sparkSqlTest : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests", yetusAnnotations : "org.apache.yetus:audience-annotations:$versions.yetus" ] http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/build.gradle ---------------------------------------------------------------------- diff --git a/java/kudu-spark/build.gradle b/java/kudu-spark/build.gradle index c723902..aa2c774 100644 --- a/java/kudu-spark/build.gradle +++ b/java/kudu-spark/build.gradle @@ -24,6 +24,7 @@ dependencies { compile libs.yetusAnnotations provided libs.scalaLibrary + provided libs.scalap provided libs.sparkCore provided libs.sparkSql provided libs.slf4jApi @@ -32,7 +33,8 @@ dependencies { testCompile project(path: ":kudu-client", configuration: "shadowTest") testCompile libs.junit testCompile libs.scalatest + testCompile libs.sparkSqlTest } // Adjust the artifact name to include the spark and scala base versions. -archivesBaseName = "kudu-spark${versions.sparkBase}_${versions.scalaBase}" \ No newline at end of file +archivesBaseName = "kudu-spark${versions.sparkBase}_${versions.scalaBase}" http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/pom.xml ---------------------------------------------------------------------- diff --git a/java/kudu-spark/pom.xml b/java/kudu-spark/pom.xml index 7665c2e..07045ae 100644 --- a/java/kudu-spark/pom.xml +++ b/java/kudu-spark/pom.xml @@ -67,6 +67,12 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> @@ -79,8 +85,15 @@ <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> - </dependency> - <dependency> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index b0fb257..a9975b6 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -20,6 +20,8 @@ package org.apache.kudu.spark.kudu import java.security.AccessController import java.security.PrivilegedAction +import java.sql.Timestamp + import javax.security.auth.Subject import javax.security.auth.login.AppConfigurationEntry import javax.security.auth.login.Configuration @@ -36,6 +38,8 @@ import org.apache.spark.sql.types.DecimalType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.util.AccumulatorV2 import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability @@ -298,7 +302,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou val schema = data.schema // Get the client's last propagated timestamp on the driver. val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp - data.foreachPartition(iterator => { + data.queryExecution.toRdd.foreachPartition(iterator => { val pendingErrors = writePartitionRows( iterator, schema, @@ -317,7 +321,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou } private def writePartitionRows( - rows: Iterator[Row], + rows: Iterator[InternalRow], schema: StructType, tableName: String, operationType: OperationType, @@ -334,8 +338,10 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou val session: KuduSession = syncClient.newSession session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND) session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors) + val typeConverter = CatalystTypeConverters.createToScalaConverter(schema) try { - for (row <- rows) { + for (internalRow <- rows) { + val row = typeConverter(internalRow).asInstanceOf[Row] val operation = operationType.operation(table) for ((sparkIdx, kuduIdx) <- indices) { if (row.isNullAt(sparkIdx)) { http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala new file mode 100644 index 0000000..246d00e --- /dev/null +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kudu.spark.kudu + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.StreamSinkProvider +import org.apache.spark.sql.streaming.OutputMode +import org.junit.Before +import org.junit.Test + +class StreamingTest extends KuduTestSuite { + + implicit var sqlContext: SQLContext = _ + var kuduOptions: Map[String, String] = _ + + @Before + def setUp(): Unit = { + sqlContext = ss.sqlContext + kuduOptions = + Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddresses) + } + + @Test + def testKuduContextWithSparkStreaming() { + val spark = ss + import spark.implicits._ + val checkpointDir = java.nio.file.Files.createTempDirectory("spark_kudu") + val input = MemoryStream[Int] + val query = input + .toDS() + .map(v => (v + 1, v.toString)) + .toDF("key", "val") + .writeStream + .format(classOf[KuduSinkProvider].getCanonicalName) + .option("kudu.master", miniCluster.getMasterAddresses) + .option("kudu.table", simpleTableName) + .option("checkpointLocation", checkpointDir.toFile.getCanonicalPath) + .outputMode(OutputMode.Update) + .start() + + def verifyOutput(expectedData: Seq[(Int, String)]): Unit = { + val df = sqlContext.read.options(kuduOptions).kudu + val actual = df.rdd + .map { row => + (row.get(0), row.getString(1)) + } + .collect() + .toSet + assert(actual === expectedData.toSet) + } + input.addData(1, 2, 3) + query.processAllAvailable() + verifyOutput(expectedData = Seq((2, "1"), (3, "2"), (4, "3"))) + query.stop() + } +} + +class KuduSinkProvider extends StreamSinkProvider with DataSourceRegister { + + override def createSink( + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = + new KuduSink(sqlContext, parameters) + + override def shortName(): String = "kudu" +} + +class KuduSink(sqlContext: SQLContext, parameters: Map[String, String]) extends Sink { + + private val kuduContext = + new KuduContext(parameters("kudu.master"), sqlContext.sparkContext) + + private val tablename = parameters("kudu.table") + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + kuduContext.upsertRows(data, tablename) + } +}
