[java] Upgrade to Spark 2.4 In Spark 2.4 spark-avro is now a part of Spark itself. This change migrates the Kudu spark-avro dependencies and adds a test to ensure that the functionality does not break.
Change-Id: Id1f8de543276c4dc82a57c4a2228ae2374f2d87f Reviewed-on: http://gerrit.cloudera.org:8080/11912 Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Hao Hao <[email protected]> Tested-by: Grant Henke <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c136efe4 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c136efe4 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c136efe4 Branch: refs/heads/master Commit: c136efe47a0ebacebe6774701ffa06fc9d41a189 Parents: 51736f3 Author: Grant Henke <[email protected]> Authored: Thu Nov 8 16:04:28 2018 -0600 Committer: Grant Henke <[email protected]> Committed: Tue Nov 13 14:28:37 2018 +0000 ---------------------------------------------------------------------- java/gradle/dependencies.gradle | 5 +- .../spark/tools/TestImportExportFiles.scala | 117 +++++++++++++++---- 2 files changed, 96 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/c136efe4/java/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle index 52a5079..71cfe16 100755 --- a/java/gradle/dependencies.gradle +++ b/java/gradle/dependencies.gradle @@ -53,8 +53,7 @@ versions += [ scalatest : "3.0.5", scopt : "3.7.0", slf4j : "1.7.25", - spark : "2.3.2", - sparkAvro : "4.0.0", + spark : "2.4.0", spotBugs : "3.1.6", yetus : "0.8.0" ] @@ -104,7 +103,7 @@ libs += [ scopt : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt", slf4jApi : "org.slf4j:slf4j-api:$versions.slf4j", slf4jLog4j12 : "org.slf4j:slf4j-log4j12:$versions.slf4j", - sparkAvro : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro", + sparkAvro : "org.apache.spark:spark-avro_$versions.scalaBase:$versions.spark", sparkCore : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark", sparkSql : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark", sparkSqlTest : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests", http://git-wip-us.apache.org/repos/asf/kudu/blob/c136efe4/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala index 040c810..6034568 100644 --- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala @@ -17,14 +17,18 @@ package org.apache.kudu.spark.tools +import java.io.File +import java.nio.file.Files import java.nio.file.Paths import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder import org.apache.kudu.Schema import org.apache.kudu.Type import org.apache.kudu.client.CreateTableOptions +import org.apache.kudu.client.KuduTable import org.apache.kudu.spark.kudu._ import org.junit.Assert._ +import org.junit.Before import org.junit.Test import org.spark_project.guava.collect.ImmutableList @@ -32,31 +36,34 @@ import scala.collection.JavaConverters._ class TestImportExportFiles extends KuduTestSuite { - private val TABLE_NAME: String = "TestImportExportFiles" - private val TABLE_DATA_PATH: String = "/TestImportExportFiles.csv" + private val TableDataPath = "/TestImportExportFiles.csv" + private val TableName = "TestImportExportFiles" + private val TableSchema = { + val columns = ImmutableList.of( + new ColumnSchemaBuilder("key", Type.STRING).key(true).build(), + new ColumnSchemaBuilder("column1_i", Type.STRING).build(), + new ColumnSchemaBuilder("column2_d", Type.STRING) + .nullable(true) + .build(), + new ColumnSchemaBuilder("column3_s", Type.STRING).build(), + new ColumnSchemaBuilder("column4_b", Type.STRING).build() + ) + new Schema(columns) + } + private val options = new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1) - @Test - def testSparkImportExport() { - val schema: Schema = { - val columns = ImmutableList.of( - new ColumnSchemaBuilder("key", Type.STRING).key(true).build(), - new ColumnSchemaBuilder("column1_i", Type.STRING).build(), - new ColumnSchemaBuilder("column2_d", Type.STRING) - .nullable(true) - .build(), - new ColumnSchemaBuilder("column3_s", Type.STRING).build(), - new ColumnSchemaBuilder("column4_b", Type.STRING).build() - ) - new Schema(columns) - } - val tableOptions = new CreateTableOptions() - .setRangePartitionColumns(List("key").asJava) - .setNumReplicas(1) - kuduClient.createTable(TABLE_NAME, schema, tableOptions) + @Before + def setUp(): Unit = { + kuduClient.createTable(TableName, TableSchema, options) + } + @Test + def testCSVImport() { // Get the absolute path of the resource file. val schemaResource = - classOf[TestImportExportFiles].getResource(TABLE_DATA_PATH) + classOf[TestImportExportFiles].getResource(TableDataPath) val dataPath = Paths.get(schemaResource.toURI).toAbsolutePath ImportExportFiles.testMain( @@ -65,15 +72,79 @@ class TestImportExportFiles extends KuduTestSuite { "--format=csv", s"--master-addrs=${harness.getMasterAddressesAsString}", s"--path=$dataPath", - s"--table-name=$TABLE_NAME", + s"--table-name=$TableName", "--delimiter=,", "--header=true", "--inferschema=true" ), ss ) - val rdd = kuduContext.kuduRDD(ss.sparkContext, TABLE_NAME, List("key")) + val rdd = kuduContext.kuduRDD(ss.sparkContext, TableName, List("key")) assert(rdd.collect.length == 4) assertEquals(rdd.collect().mkString(","), "[1],[2],[3],[4]") } + + @Test + def testRoundTrips(): Unit = { + val table = kuduClient.openTable(TableName) + loadSampleData(table, 50) + runRoundTripTest(TableName, s"$TableName-avro", "avro") + runRoundTripTest(TableName, s"$TableName-csv", "csv") + runRoundTripTest(TableName, s"$TableName-parquet", "parquet") + } + + // TODO(KUDU-2454): Use random schemas and random data to ensure all type/values round-trip. + private def loadSampleData(table: KuduTable, numRows: Int): Unit = { + val session = kuduClient.newSession() + val rows = Range(0, numRows).map { i => + val insert = table.newInsert + val row = insert.getRow + row.addString(0, i.toString) + row.addString(1, i.toString) + row.addString(3, i.toString) + row.addString(4, i.toString) + session.apply(insert) + } + session.close + } + + private def runRoundTripTest(fromTable: String, toTable: String, format: String): Unit = { + val dir = Files.createTempDirectory("round-trip") + val path = new File(dir.toFile, s"$fromTable-$format").getAbsolutePath + + // Export the data. + ImportExportFiles.testMain( + Array( + "--operation=export", + s"--format=$format", + s"--master-addrs=${harness.getMasterAddressesAsString}", + s"--path=$path", + s"--table-name=$fromTable", + s"--header=true" + ), + ss + ) + + // Create the target table. + kuduClient.createTable(toTable, TableSchema, options) + + // Import the data. + ImportExportFiles.testMain( + Array( + "--operation=import", + s"--format=$format", + s"--master-addrs=${harness.getMasterAddressesAsString}", + s"--path=$path", + s"--table-name=$toTable", + s"--header=true" + ), + ss + ) + + // Verify the tables match. + // TODO(KUDU-2454): Verify every value to ensure all values round trip. + val rdd1 = kuduContext.kuduRDD(ss.sparkContext, fromTable, List("key")) + val rdd2 = kuduContext.kuduRDD(ss.sparkContext, toTable, List("key")) + assertResult(rdd1.count())(rdd2.count()) + } }
