Repository: kudu Updated Branches: refs/heads/master 8513685ba -> ec654c49f
[test] Migrate the Spark tests to use the new KuduRule Changes the KuduTestSuite base class to use the new KuduRule and adjusts the Spark tests as needed. Change-Id: I627161476092a555b9f318f2b2c1fa79a54507f4 Reviewed-on: http://gerrit.cloudera.org:8080/11548 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ec654c49 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ec654c49 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ec654c49 Branch: refs/heads/master Commit: ec654c49f2814b4677015788d7a8ea13ca405317 Parents: 8513685 Author: Grant Henke <[email protected]> Authored: Sun Sep 30 23:05:34 2018 -0500 Committer: Grant Henke <[email protected]> Committed: Tue Oct 2 19:03:59 2018 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/backup/TestKuduBackup.scala | 4 +-- .../kudu/spark/tools/ITBigLinkedListTest.scala | 4 +-- .../spark/tools/TestImportExportFiles.scala | 2 +- .../kudu/spark/kudu/DefaultSourceTest.scala | 38 +++++++++----------- .../kudu/spark/kudu/KuduContextTest.scala | 2 +- .../apache/kudu/spark/kudu/KuduTestSuite.scala | 30 +++++----------- .../apache/kudu/spark/kudu/StreamingTest.scala | 4 +-- 7 files changed, 34 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala ---------------------------------------------------------------------- diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 1dc7b96..0650ad9 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -351,11 +351,11 @@ class TestKuduBackup extends KuduTestSuite { val path = dir.toUri.toString val backupOptions = - new KuduBackupOptions(tableNames, path, miniCluster.getMasterAddressesAsString) + new KuduBackupOptions(tableNames, path, harness.getMasterAddressesAsString) KuduBackup.run(backupOptions, ss) val restoreOptions = - new KuduRestoreOptions(tableNames, path, miniCluster.getMasterAddressesAsString) + new KuduRestoreOptions(tableNames, path, harness.getMasterAddressesAsString) KuduRestore.run(restoreOptions, ss) FileUtils.deleteDirectory(dir.toFile) http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala index fc01350..fbc7d49 100644 --- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala @@ -37,7 +37,7 @@ class ITBigLinkedListTest extends KuduTestSuite { "--hash-partitions=2", "--range-partitions=2", "--replicas=1", - s"--master-addrs=${miniCluster.getMasterAddressesAsString}" + s"--master-addrs=${harness.getMasterAddressesAsString}" ), ss ) @@ -76,7 +76,7 @@ class ITBigLinkedListTest extends KuduTestSuite { } val counts = Verifier - .testMain(Array(s"--master-addrs=${miniCluster.getMasterAddressesAsString}"), ss) + .testMain(Array(s"--master-addrs=${harness.getMasterAddressesAsString}"), ss) assertEquals(2 * 2 * 10000, counts.referenced) assertEquals(1, counts.extrareferences) assertEquals(2, counts.unreferenced) http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala index 54f801d..040c810 100644 --- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala @@ -63,7 +63,7 @@ class TestImportExportFiles extends KuduTestSuite { Array( "--operation=import", "--format=csv", - s"--master-addrs=${miniCluster.getMasterAddressesAsString}", + s"--master-addrs=${harness.getMasterAddressesAsString}", s"--path=$dataPath", s"--table-name=$TABLE_NAME", "--delimiter=,", http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 624257c..bc0d93c 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -49,7 +49,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { sqlContext = ss.sqlContext kuduOptions = - Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString) sqlContext.read.options(kuduOptions).kudu.createOrReplaceTempView(tableName) } @@ -72,7 +72,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // now use new options to refer to the new table name val newOptions: Map[String, String] = - Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString) val checkDf = sqlContext.read.options(newOptions).kudu assert(checkDf.schema === df.schema) @@ -110,7 +110,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // now use new options to refer to the new table name val newOptions: Map[String, String] = - Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString) val checkDf = sqlContext.read.options(newOptions).kudu assert(checkDf.schema === df.schema) @@ -194,7 +194,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val updateDF = baseDF.withColumn("c2_s", lit("abc")) val newOptions: Map[String, String] = Map( "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.operation" -> "insert", "kudu.ignoreDuplicateRowErrors" -> "true") updateDF.write.options(newOptions).mode("append").kudu @@ -226,7 +226,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val updateDF = baseDF.withColumn("c2_s", lit("abc")) val newOptions: Map[String, String] = Map( "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.operation" -> "insert-ignore") updateDF.write.options(newOptions).mode("append").kudu @@ -311,9 +311,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val dataDF = sqlContext.read .options( - Map( - "kudu.master" -> miniCluster.getMasterAddressesAsString, - "kudu.table" -> simpleTableName)) + Map("kudu.master" -> harness.getMasterAddressesAsString, "kudu.table" -> simpleTableName)) .kudu val nullDF = sqlContext @@ -344,9 +342,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val dataDF = sqlContext.read .options( - Map( - "kudu.master" -> miniCluster.getMasterAddressesAsString, - "kudu.table" -> simpleTableName)) + Map("kudu.master" -> harness.getMasterAddressesAsString, "kudu.table" -> simpleTableName)) .kudu val nullDF = sqlContext @@ -354,14 +350,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { .toDF("key", "val") val options_0: Map[String, String] = Map( "kudu.table" -> simpleTableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.ignoreNull" -> "true") nullDF.write.options(options_0).mode("append").kudu assert(dataDF.collect.toList === nonNullDF.collect.toList) kuduContext.updateRows(nonNullDF, simpleTableName) val options_1: Map[String, String] = - Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> simpleTableName, "kudu.master" -> harness.getMasterAddressesAsString) nullDF.write.options(options_1).mode("append").kudu assert(dataDF.collect.toList === nullDF.collect.toList) @@ -406,7 +402,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { def testTableFaultTolerantScan() { kuduOptions = Map( "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.faultTolerantScan" -> "true") val table = "faultTolerantScanTest" @@ -659,7 +655,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { kuduSession.apply(insert) } val options: Map[String, String] = - Map("kudu.table" -> testTableName, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> testTableName, "kudu.master" -> harness.getMasterAddressesAsString) sqlContext.read.options(options).kudu.createOrReplaceTempView(testTableName) val checkPrefixCount = { prefix: String => @@ -722,7 +718,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { .setNumReplicas(1)) val newOptions: Map[String, String] = - Map("kudu.table" -> insertTable, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> insertTable, "kudu.master" -> harness.getMasterAddressesAsString) sqlContext.read .options(newOptions) .kudu @@ -749,7 +745,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { .setNumReplicas(1)) val newOptions: Map[String, String] = - Map("kudu.table" -> insertTable, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> insertTable, "kudu.master" -> harness.getMasterAddressesAsString) sqlContext.read .options(newOptions) .kudu @@ -779,7 +775,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { .setNumReplicas(1)) val newOptions: Map[String, String] = - Map("kudu.table" -> newTable, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> newTable, "kudu.master" -> harness.getMasterAddressesAsString) df.write.options(newOptions).mode("append").kudu val checkDf = sqlContext.read.options(newOptions).kudu @@ -826,7 +822,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { def testScanLocality() { kuduOptions = Map( "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.scanLocality" -> "closest_replica") val table = "scanLocalityTest" @@ -907,7 +903,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { def testScanRequestTimeoutPropagation() { kuduOptions = Map( "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.scanRequestTimeoutMs" -> "1") val dataFrame = sqlContext.read.options(kuduOptions).kudu val kuduRelation = kuduRelationFromDataFrame(dataFrame) @@ -923,7 +919,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { def testSocketReadTimeoutPropagation() { kuduOptions = Map( "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddressesAsString, + "kudu.master" -> harness.getMasterAddressesAsString, "kudu.socketReadTimeoutMs" -> "1") val dataFrame = sqlContext.read.options(kuduOptions).kudu val kuduRelation = kuduRelationFromDataFrame(dataFrame) http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala index 88d0a28..edaac97 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala @@ -114,7 +114,7 @@ class KuduContextTest extends KuduTestSuite with Matchers { insertRows(table, rowCount) val sqlContext = ss.sqlContext val dataDF = sqlContext.read - .options(Map("kudu.master" -> miniCluster.getMasterAddressesAsString, "kudu.table" -> "test")) + .options(Map("kudu.master" -> harness.getMasterAddressesAsString, "kudu.table" -> "test")) .kudu dataDF .sort("key") http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala index af6423b..03b2ba2 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala @@ -24,15 +24,12 @@ import scala.collection.immutable.IndexedSeq import org.apache.spark.SparkConf import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder -import org.apache.kudu.client.KuduClient.KuduClientBuilder -import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder import org.apache.kudu.client.CreateTableOptions import org.apache.kudu.client.KuduClient import org.apache.kudu.client.KuduTable -import org.apache.kudu.client.MiniKuduCluster -import org.apache.kudu.junit.RetryRule import org.apache.kudu.Schema import org.apache.kudu.Type +import org.apache.kudu.test.KuduTestHarness import org.apache.kudu.util.DecimalUtil import org.apache.spark.sql.SparkSession import org.junit.After @@ -40,10 +37,10 @@ import org.junit.Before import org.junit.Rule import org.scalatest.junit.JUnitSuite -// TODO (grant): Use BaseKuduTest for most of this. +import scala.annotation.meta.getter + trait KuduTestSuite extends JUnitSuite { var ss: SparkSession = _ - var miniCluster: MiniKuduCluster = _ var kuduClient: KuduClient = _ var table: KuduTable = _ var kuduContext: KuduContext = _ @@ -121,37 +118,28 @@ trait KuduTestSuite extends JUnitSuite { .set("spark.ui.enabled", "false") .set("spark.app.id", appID) - // Add a rule to rerun tests. We use this with Gradle because it doesn't support - // Surefire/Failsafe rerunFailingTestsCount like Maven does. - @Rule - def retryRule = new RetryRule() + // Ensure the annotation is applied to the getter and not the field + // or else Junit will complain that the Rule must be public. + @(Rule @getter) + val harness = new KuduTestHarness() @Before def setUpBase(): Unit = { - miniCluster = new MiniKuduClusterBuilder() - .numMasterServers(1) - .numTabletServers(1) - .build() + kuduClient = harness.getClient ss = SparkSession.builder().config(conf).getOrCreate() - - kuduClient = new KuduClientBuilder(miniCluster.getMasterAddressesAsString).build() - - kuduContext = new KuduContext(miniCluster.getMasterAddressesAsString, ss.sparkContext) + kuduContext = new KuduContext(harness.getMasterAddressesAsString, ss.sparkContext) table = kuduClient.createTable(tableName, schema, tableOptions) val simpleTableOptions = new CreateTableOptions() .setRangePartitionColumns(List("key").asJava) .setNumReplicas(1) - kuduClient.createTable(simpleTableName, simpleSchema, simpleTableOptions) } @After def tearDownBase() { - if (kuduClient != null) kuduClient.shutdown() - if (miniCluster != null) miniCluster.shutdown() if (ss != null) ss.stop() } http://git-wip-us.apache.org/repos/asf/kudu/blob/ec654c49/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala index ff7adec..ce51dc1 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala @@ -34,7 +34,7 @@ class StreamingTest extends KuduTestSuite { def setUp(): Unit = { sqlContext = ss.sqlContext kuduOptions = - Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddressesAsString) + Map("kudu.table" -> simpleTableName, "kudu.master" -> harness.getMasterAddressesAsString) } @Test @@ -49,7 +49,7 @@ class StreamingTest extends KuduTestSuite { .toDF("key", "val") .writeStream .format(classOf[KuduSinkProvider].getCanonicalName) - .option("kudu.master", miniCluster.getMasterAddressesAsString) + .option("kudu.master", harness.getMasterAddressesAsString) .option("kudu.table", simpleTableName) .option("checkpointLocation", checkpointDir.toFile.getCanonicalPath) .outputMode(OutputMode.Update)
