http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index fc8576e..9bcd73d 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -19,23 +19,28 @@ package org.apache.kudu.spark.kudu import scala.collection.JavaConverters._ import scala.collection.immutable.IndexedSeq import scala.util.control.NonFatal -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.scalatest.Matchers import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder import org.apache.kudu.client.CreateTableOptions -import org.apache.kudu.{Schema, Type} +import org.apache.kudu.Schema +import org.apache.kudu.Type import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.junit.{Before, Test} +import org.junit.Before +import org.junit.Test class DefaultSourceTest extends KuduTestSuite with Matchers { val rowCount = 10 - var sqlContext : SQLContext = _ - var rows : IndexedSeq[(Int, Int, String, Long)] = _ - var kuduOptions : Map[String, String] = _ + var sqlContext: SQLContext = _ + var rows: IndexedSeq[(Int, Int, String, Long)] = _ + var kuduOptions: Map[String, String] = _ @Before def setUp(): Unit = { @@ -43,9 +48,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { sqlContext = ss.sqlContext - kuduOptions = Map( - "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddresses) + kuduOptions = Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses) sqlContext.read.options(kuduOptions).kudu.createOrReplaceTempView(tableName) } @@ -57,15 +60,18 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { kuduContext.deleteTable(tableName) } val df = sqlContext.read.options(kuduOptions).kudu - kuduContext.createTable(tableName, df.schema, Seq("key"), - new CreateTableOptions().setRangePartitionColumns(List("key").asJava) - .setNumReplicas(1)) + kuduContext.createTable( + tableName, + df.schema, + Seq("key"), + new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1)) kuduContext.insertRows(df, tableName) // now use new options to refer to the new table name - val newOptions: Map[String, String] = Map( - "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddresses) + val newOptions: Map[String, String] = + Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses) val checkDf = sqlContext.read.options(newOptions).kudu assert(checkDf.schema === df.schema) @@ -90,18 +96,20 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val upper = kuduSchema.newPartialRow() upper.addInt("key", Integer.MAX_VALUE) - kuduContext.createTable(tableName, kuduSchema, + kuduContext.createTable( + tableName, + kuduSchema, new CreateTableOptions() .addHashPartitions(List("key").asJava, 2) .setRangePartitionColumns(List("key").asJava) .addRangePartition(lower, upper) - .setNumReplicas(1)) + .setNumReplicas(1) + ) kuduContext.insertRows(df, tableName) // now use new options to refer to the new table name - val newOptions: Map[String, String] = Map( - "kudu.table" -> tableName, - "kudu.master" -> miniCluster.getMasterAddresses) + val newOptions: Map[String, String] = + Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses) val checkDf = sqlContext.read.options(newOptions).kudu assert(checkDf.schema === df.schema) @@ -115,7 +123,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testInsertion() { val df = sqlContext.read.options(kuduOptions).kudu - val changedDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc")) + val changedDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("abc")) kuduContext.insertRows(changedDF, tableName) val newDF = sqlContext.read.options(kuduOptions).kudu @@ -128,7 +139,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testInsertionMultiple() { val df = sqlContext.read.options(kuduOptions).kudu - val changedDF = df.limit(2).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc")) + val changedDF = df + .limit(2) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("abc")) kuduContext.insertRows(changedDF, tableName) val newDF = sqlContext.read.options(kuduOptions).kudu @@ -154,7 +168,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { kuduContext.insertRows(updateDF, tableName, kuduWriteOptions) // change the key and insert - val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def")) + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) kuduContext.insertRows(insertDF, tableName, kuduWriteOptions) // read the data back @@ -183,7 +200,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { updateDF.write.options(newOptions).mode("append").kudu // change the key and insert - val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def")) + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) insertDF.write.options(newOptions).mode("append").kudu // read the data back @@ -211,7 +231,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { updateDF.write.options(newOptions).mode("append").kudu // change the key and insert - val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def")) + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) insertDF.write.options(newOptions).mode("append").kudu // read the data back @@ -235,7 +258,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { kuduContext.insertIgnoreRows(updateDF, tableName) // change the key and insert - val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def")) + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) kuduContext.insertIgnoreRows(insertDF, tableName) // read the data back @@ -259,7 +285,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { kuduContext.upsertRows(updateDF, tableName) // change the key and insert - val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def")) + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) kuduContext.upsertRows(insertDF, tableName) // read the data back @@ -276,13 +305,18 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testUpsertRowsIgnoreNulls() { - val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") + val nonNullDF = + sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") kuduContext.insertRows(nonNullDF, simpleTableName) - val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses, - "kudu.table" -> simpleTableName)).kudu + val dataDF = sqlContext.read + .options( + Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> simpleTableName)) + .kudu - val nullDF = sqlContext.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val") + val nullDF = sqlContext + .createDataFrame(Seq((0, null.asInstanceOf[String]))) + .toDF("key", "val") val kuduWriteOptions = new KuduWriteOptions kuduWriteOptions.ignoreNull = true kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions) @@ -303,13 +337,18 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testUpsertRowsIgnoreNullsUsingDefaultSource() { - val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") + val nonNullDF = + sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") kuduContext.insertRows(nonNullDF, simpleTableName) - val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses, - "kudu.table" -> simpleTableName)).kudu + val dataDF = sqlContext.read + .options( + Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> simpleTableName)) + .kudu - val nullDF = sqlContext.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val") + val nullDF = sqlContext + .createDataFrame(Seq((0, null.asInstanceOf[String]))) + .toDF("key", "val") val options_0: Map[String, String] = Map( "kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddresses, @@ -318,9 +357,8 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { assert(dataDF.collect.toList === nonNullDF.collect.toList) kuduContext.updateRows(nonNullDF, simpleTableName) - val options_1: Map[String, String] = Map( - "kudu.table" -> simpleTableName, - "kudu.master" -> miniCluster.getMasterAddresses) + val options_1: Map[String, String] = + Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddresses) nullDF.write.options(options_1).mode("append").kudu assert(dataDF.collect.toList === nullDF.collect.toList) @@ -346,7 +384,8 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testOutOfOrderSelection() { - val df = sqlContext.read.options(kuduOptions).kudu.select( "c2_s", "c1_i", "key") + val df = + sqlContext.read.options(kuduOptions).kudu.select("c2_s", "c1_i", "key") val collected = df.collect() assert(collected(0).getString(0).equals("0")) } @@ -383,61 +422,91 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testTableScanWithProjectionAndPredicateDouble() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5 }, - sqlContext.sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""") + .count()) } @Test def testTableScanWithProjectionAndPredicateLong() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5 }, - sqlContext.sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""") + .count()) } @Test def testTableScanWithProjectionAndPredicateBool() { - assertEquals(rows.count { case (key, i, s, ts) => i % 2==0 }, - sqlContext.sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i % 2 == 0 }, + sqlContext + .sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""") + .count()) } @Test def testTableScanWithProjectionAndPredicateShort() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5}, - sqlContext.sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""") + .count()) } @Test def testTableScanWithProjectionAndPredicateFloat() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5}, - sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""") + .count()) } @Test def testTableScanWithProjectionAndPredicateDecimal32() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5}, - sqlContext.sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""") + .count()) } @Test def testTableScanWithProjectionAndPredicateDecimal64() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5}, - sqlContext.sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""") + .count()) } @Test def testTableScanWithProjectionAndPredicateDecimal128() { - assertEquals(rows.count { case (key, i, s, ts) => i > 5}, - sqlContext.sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => i > 5 }, + sqlContext + .sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""") + .count()) } @Test def testTableScanWithProjectionAndPredicate() { - assertEquals(rows.count { case (key, i, s, ts) => s != null && s > "5" }, - sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > "5"""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => s != null && s > "5" }, + sqlContext + .sql(s"""SELECT key FROM $tableName where c2_s > "5"""") + .count()) - assertEquals(rows.count { case (key, i, s, ts) => s != null }, - sqlContext.sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""").count()) + assertEquals( + rows.count { case (key, i, s, ts) => s != null }, + sqlContext + .sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""") + .count()) } @Test @@ -459,7 +528,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testBasicSparkSQLWithPredicate() { - val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1").collectAsList() + val results = sqlContext + .sql("SELECT key FROM " + tableName + " where key=1") + .collectAsList() assert(results.size() == 1) assert(results.get(0).size.equals(1)) assert(results.get(0).getInt(0).equals(1)) @@ -468,7 +539,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testBasicSparkSQLWithTwoPredicates() { - val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'").collectAsList() + val results = sqlContext + .sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'") + .collectAsList() assert(results.size() == 1) assert(results.get(0).size.equals(1)) assert(results.get(0).getInt(0).equals(2)) @@ -477,45 +550,58 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testBasicSparkSQLWithInListPredicate() { val keys = Array(1, 5, 7) - val results = sqlContext.sql(s"SELECT key FROM $tableName where key in (${keys.mkString(", ")})").collectAsList() + val results = sqlContext + .sql(s"SELECT key FROM $tableName where key in (${keys.mkString(", ")})") + .collectAsList() assert(results.size() == keys.length) - keys.zipWithIndex.foreach { case (v, i) => - assert(results.get(i).size.equals(1)) - assert(results.get(i).getInt(0).equals(v)) + keys.zipWithIndex.foreach { + case (v, i) => + assert(results.get(i).size.equals(1)) + assert(results.get(i).getInt(0).equals(v)) } } @Test def testBasicSparkSQLWithInListPredicateOnString() { val keys = Array(1, 4, 6) - val results = sqlContext.sql(s"SELECT key FROM $tableName where c2_s in (${keys.mkString("'", "', '", "'")})").collectAsList() + val results = sqlContext + .sql(s"SELECT key FROM $tableName where c2_s in (${keys.mkString("'", "', '", "'")})") + .collectAsList() assert(results.size() == keys.count(_ % 2 == 0)) - keys.filter(_ % 2 == 0).zipWithIndex.foreach { case (v, i) => - assert(results.get(i).size.equals(1)) - assert(results.get(i).getInt(0).equals(v)) + keys.filter(_ % 2 == 0).zipWithIndex.foreach { + case (v, i) => + assert(results.get(i).size.equals(1)) + assert(results.get(i).getInt(0).equals(v)) } } @Test def testBasicSparkSQLWithInListAndComparisonPredicate() { val keys = Array(1, 5, 7) - val results = sqlContext.sql(s"SELECT key FROM $tableName where key>2 and key in (${keys.mkString(", ")})").collectAsList() - assert(results.size() == keys.count(_>2)) - keys.filter(_>2).zipWithIndex.foreach { case (v, i) => - assert(results.get(i).size.equals(1)) - assert(results.get(i).getInt(0).equals(v)) + val results = sqlContext + .sql(s"SELECT key FROM $tableName where key>2 and key in (${keys.mkString(", ")})") + .collectAsList() + assert(results.size() == keys.count(_ > 2)) + keys.filter(_ > 2).zipWithIndex.foreach { + case (v, i) => + assert(results.get(i).size.equals(1)) + assert(results.get(i).getInt(0).equals(v)) } } @Test def testBasicSparkSQLWithTwoPredicatesNegative() { - val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'").collectAsList() + val results = sqlContext + .sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'") + .collectAsList() assert(results.size() == 0) } @Test def testBasicSparkSQLWithTwoPredicatesIncludingString() { - val results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s='2'").collectAsList() + val results = sqlContext + .sql("SELECT key FROM " + tableName + " where c2_s='2'") + .collectAsList() assert(results.size() == 1) assert(results.get(0).size.equals(1)) assert(results.get(0).getInt(0).equals(2)) @@ -523,7 +609,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testBasicSparkSQLWithTwoPredicatesAndProjection() { - val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'").collectAsList() + val results = sqlContext + .sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'") + .collectAsList() assert(results.size() == 1) assert(results.get(0).size.equals(2)) assert(results.get(0).getInt(0).equals(2)) @@ -532,7 +620,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testBasicSparkSQLWithTwoPredicatesGreaterThan() { - val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'").collectAsList() + val results = sqlContext + .sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'") + .collectAsList() assert(results.size() == 4) assert(results.get(0).size.equals(2)) assert(results.get(0).getInt(0).equals(2)) @@ -543,15 +633,21 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { def testSparkSQLStringStartsWithFilters() { // This test requires a special table. val testTableName = "startswith" - val schema = new Schema(List( - new ColumnSchemaBuilder("key", Type.STRING).key(true).build()).asJava) - val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava) + val schema = new Schema( + List(new ColumnSchemaBuilder("key", Type.STRING).key(true).build()).asJava) + val tableOptions = new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) .setNumReplicas(1) val testTable = kuduClient.createTable(testTableName, schema, tableOptions) val kuduSession = kuduClient.newSession() val chars = List('a', 'b', 'ä¹', Char.MaxValue, '\u0000') - val keys = for (x <- chars; y <- chars; z <- chars; w <- chars) yield Array(x, y, z, w).mkString + val keys = for { + x <- chars + y <- chars + z <- chars + w <- chars + } yield Array(x, y, z, w).mkString keys.foreach { key => val insert = testTable.newInsert val row = insert.getRow @@ -559,9 +655,8 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { row.addString(0, key) kuduSession.apply(insert) } - val options: Map[String, String] = Map( - "kudu.table" -> testTableName, - "kudu.master" -> miniCluster.getMasterAddresses) + val options: Map[String, String] = + Map("kudu.table" -> testTableName, "kudu.master" -> miniCluster.getMasterAddresses) sqlContext.read.options(options).kudu.createOrReplaceTempView(testTableName) val checkPrefixCount = { prefix: String => @@ -575,26 +670,37 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { checkPrefixCount(Array(x).mkString) } // all two character combos - for (x <- chars; y <- chars) { + for { + x <- chars + y <- chars + } { checkPrefixCount(Array(x, y).mkString) } } @Test def testSparkSQLIsNullPredicate() { - var results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s IS NULL").collectAsList() + var results = sqlContext + .sql("SELECT key FROM " + tableName + " where c2_s IS NULL") + .collectAsList() assert(results.size() == 5) - results = sqlContext.sql("SELECT key FROM " + tableName + " where key IS NULL").collectAsList() + results = sqlContext + .sql("SELECT key FROM " + tableName + " where key IS NULL") + .collectAsList() assert(results.isEmpty()) } @Test def testSparkSQLIsNotNullPredicate() { - var results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL").collectAsList() + var results = sqlContext + .sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL") + .collectAsList() assert(results.size() == 5) - results = sqlContext.sql("SELECT key FROM " + tableName + " where key IS NOT NULL").collectAsList() + results = sqlContext + .sql("SELECT key FROM " + tableName + " where key IS NOT NULL") + .collectAsList() assert(results.size() == 10) } @@ -604,17 +710,24 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // read 0 rows just to get the schema val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0") - kuduContext.createTable(insertTable, df.schema, Seq("key"), - new CreateTableOptions().setRangePartitionColumns(List("key").asJava) + kuduContext.createTable( + insertTable, + df.schema, + Seq("key"), + new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) .setNumReplicas(1)) - val newOptions: Map[String, String] = Map( - "kudu.table" -> insertTable, - "kudu.master" -> miniCluster.getMasterAddresses) - sqlContext.read.options(newOptions).kudu.createOrReplaceTempView(insertTable) + val newOptions: Map[String, String] = + Map("kudu.table" -> insertTable, "kudu.master" -> miniCluster.getMasterAddresses) + sqlContext.read + .options(newOptions) + .kudu + .createOrReplaceTempView(insertTable) sqlContext.sql(s"INSERT INTO TABLE $insertTable SELECT * FROM $tableName") - val results = sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList() + val results = + sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList() assertEquals(10, results.size()) } @@ -624,14 +737,20 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // read 0 rows just to get the schema val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0") - kuduContext.createTable(insertTable, df.schema, Seq("key"), - new CreateTableOptions().setRangePartitionColumns(List("key").asJava) + kuduContext.createTable( + insertTable, + df.schema, + Seq("key"), + new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) .setNumReplicas(1)) - val newOptions: Map[String, String] = Map( - "kudu.table" -> insertTable, - "kudu.master" -> miniCluster.getMasterAddresses) - sqlContext.read.options(newOptions).kudu.createOrReplaceTempView(insertTable) + val newOptions: Map[String, String] = + Map("kudu.table" -> insertTable, "kudu.master" -> miniCluster.getMasterAddresses) + sqlContext.read + .options(newOptions) + .kudu + .createOrReplaceTempView(insertTable) try { sqlContext.sql(s"INSERT OVERWRITE TABLE $insertTable SELECT * FROM $tableName") @@ -648,13 +767,16 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { val df = sqlContext.read.options(kuduOptions).kudu val newTable = "testwritedatasourcetable" - kuduContext.createTable(newTable, df.schema, Seq("key"), - new CreateTableOptions().setRangePartitionColumns(List("key").asJava) - .setNumReplicas(1)) + kuduContext.createTable( + newTable, + df.schema, + Seq("key"), + new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1)) - val newOptions: Map[String, String] = Map( - "kudu.table" -> newTable, - "kudu.master" -> miniCluster.getMasterAddresses) + val newOptions: Map[String, String] = + Map("kudu.table" -> newTable, "kudu.master" -> miniCluster.getMasterAddresses) df.write.options(newOptions).mode("append").kudu val checkDf = sqlContext.read.options(newOptions).kudu @@ -666,15 +788,17 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testCreateRelationWithSchema() { // user-supplied schema that is compatible with actual schema, but with the key at the end - val userSchema: StructType = StructType(List( - StructField("c4_long", DataTypes.LongType), - StructField("key", DataTypes.IntegerType) - )) + val userSchema: StructType = StructType( + List( + StructField("c4_long", DataTypes.LongType), + StructField("key", DataTypes.IntegerType) + )) val dfDefaultSchema = sqlContext.read.options(kuduOptions).kudu assertEquals(14, dfDefaultSchema.schema.fields.length) - val dfWithUserSchema = sqlContext.read.options(kuduOptions).schema(userSchema).kudu + val dfWithUserSchema = + sqlContext.read.options(kuduOptions).schema(userSchema).kudu assertEquals(2, dfWithUserSchema.schema.fields.length) dfWithUserSchema.limit(10).collect() @@ -684,14 +808,15 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testCreateRelationWithInvalidSchema() { // user-supplied schema that is NOT compatible with actual schema - val userSchema: StructType = StructType(List( - StructField("foo", DataTypes.LongType), - StructField("bar", DataTypes.IntegerType) - )) + val userSchema: StructType = StructType( + List( + StructField("foo", DataTypes.LongType), + StructField("bar", DataTypes.IntegerType) + )) intercept[IllegalArgumentException] { sqlContext.read.options(kuduOptions).schema(userSchema).kudu - }.getMessage should include ("Unknown column: foo") + }.getMessage should include("Unknown column: foo") } @Test @@ -715,10 +840,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testTimestampPropagation() { val df = sqlContext.read.options(kuduOptions).kudu - val insertDF = df.limit(1) - .withColumn("key", df("key") - .plus(100)) - .withColumn("c2_s", lit("abc")) + val insertDF = df + .limit(1) + .withColumn( + "key", + df("key") + .plus(100)) + .withColumn("c2_s", lit("abc")) // Initiate a write via KuduContext, and verify that the client should // have propagated timestamp. @@ -743,10 +871,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { // Initiate another write via KuduContext, and verify that the client should // move the propagated timestamp further. - val updateDF = df.limit(1) - .withColumn("key", df("key") - .plus(100)) - .withColumn("c2_s", lit("def")) + val updateDF = df + .limit(1) + .withColumn( + "key", + df("key") + .plus(100)) + .withColumn("c2_s", lit("def")) val kuduWriteOptions = new KuduWriteOptions kuduWriteOptions.ignoreDuplicateRowErrors = true kuduContext.insertRows(updateDF, tableName, kuduWriteOptions) @@ -754,10 +885,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { } /** - * Assuming that the only part of the logical plan is a Kudu scan, this - * function extracts the KuduRelation from the passed DataFrame for - * testing purposes. - */ + * Assuming that the only part of the logical plan is a Kudu scan, this + * function extracts the KuduRelation from the passed DataFrame for + * testing purposes. + */ def kuduRelationFromDataFrame(dataFrame: DataFrame) = { val logicalPlan = dataFrame.queryExecution.logical val logicalRelation = logicalPlan.asInstanceOf[LogicalRelation] @@ -766,10 +897,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { } /** - * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the - * DefaultSource and makes it into the KuduRelation as a configuration - * parameter. - */ + * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the + * DefaultSource and makes it into the KuduRelation as a configuration + * parameter. + */ @Test def testScanRequestTimeoutPropagation() { kuduOptions = Map( @@ -782,10 +913,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { } /** - * Verify that the kudu.socketReadTimeoutMs parameter is parsed by the - * DefaultSource and makes it into the KuduRelation as a configuration - * parameter. - */ + * Verify that the kudu.socketReadTimeoutMs parameter is parsed by the + * DefaultSource and makes it into the KuduRelation as a configuration + * parameter. + */ @Test def testSocketReadTimeoutPropagation() { kuduOptions = Map(
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala index 2329eff..968b6a6 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala @@ -16,7 +16,10 @@ */ package org.apache.kudu.spark.kudu -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.ObjectInputStream +import java.io.ObjectOutputStream import java.math.BigDecimal import java.sql.Timestamp @@ -61,10 +64,29 @@ class KuduContextTest extends KuduTestSuite with Matchers { @Test def testBasicKuduRDD() { val rows = insertRows(table, rowCount) - val scanList = kuduContext.kuduRDD(ss.sparkContext, "test", Seq("key", "c1_i", "c2_s", "c3_double", - "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", "c9_unixtime_micros", "c10_byte", - "c11_decimal32", "c12_decimal64", "c13_decimal128")) - .map(r => r.toSeq).collect() + val scanList = kuduContext + .kuduRDD( + ss.sparkContext, + "test", + Seq( + "key", + "c1_i", + "c2_s", + "c3_double", + "c4_long", + "c5_bool", + "c6_short", + "c7_float", + "c8_binary", + "c9_unixtime_micros", + "c10_byte", + "c11_decimal32", + "c12_decimal64", + "c13_decimal128" + ) + ) + .map(r => r.toSeq) + .collect() scanList.foreach(r => { val index = r.apply(0).asInstanceOf[Int] assert(r.apply(0).asInstanceOf[Int] == rows.apply(index)._1) @@ -72,13 +94,14 @@ class KuduContextTest extends KuduTestSuite with Matchers { assert(r.apply(2).asInstanceOf[String] == rows.apply(index)._3) assert(r.apply(3).asInstanceOf[Double] == rows.apply(index)._2.toDouble) assert(r.apply(4).asInstanceOf[Long] == rows.apply(index)._2.toLong) - assert(r.apply(5).asInstanceOf[Boolean] == (rows.apply(index)._2%2==1)) + assert(r.apply(5).asInstanceOf[Boolean] == (rows.apply(index)._2 % 2 == 1)) assert(r.apply(6).asInstanceOf[Short] == rows.apply(index)._2.toShort) assert(r.apply(7).asInstanceOf[Float] == rows.apply(index)._2.toFloat) val binaryBytes = s"bytes ${rows.apply(index)._2}".getBytes().toSeq assert(r.apply(8).asInstanceOf[Array[Byte]].toSeq == binaryBytes) - assert(r.apply(9).asInstanceOf[Timestamp] == - TimestampUtil.microsToTimestamp(rows.apply(index)._4)) + assert( + r.apply(9).asInstanceOf[Timestamp] == + TimestampUtil.microsToTimestamp(rows.apply(index)._4)) assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte) assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2)) assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2)) @@ -90,12 +113,23 @@ class KuduContextTest extends KuduTestSuite with Matchers { def testKuduSparkDataFrame() { insertRows(table, rowCount) val sqlContext = ss.sqlContext - val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses, - "kudu.table" -> "test")).kudu - dataDF.sort("key").select("c8_binary").first.get(0) - .asInstanceOf[Array[Byte]].shouldBe("bytes 0".getBytes) + val dataDF = sqlContext.read + .options(Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> "test")) + .kudu + dataDF + .sort("key") + .select("c8_binary") + .first + .get(0) + .asInstanceOf[Array[Byte]] + .shouldBe("bytes 0".getBytes) // decode the binary to string and compare - dataDF.sort("key").withColumn("c8_binary", decode(dataDF("c8_binary"), "UTF-8")) - .select("c8_binary").first.get(0).shouldBe("bytes 0") + dataDF + .sort("key") + .withColumn("c8_binary", decode(dataDF("c8_binary"), "UTF-8")) + .select("c8_binary") + .first + .get(0) + .shouldBe("bytes 0") } } http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala index 65f8215..ed761cc 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala @@ -26,12 +26,18 @@ import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder import org.apache.kudu.client.KuduClient.KuduClientBuilder import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder -import org.apache.kudu.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster} +import org.apache.kudu.client.CreateTableOptions +import org.apache.kudu.client.KuduClient +import org.apache.kudu.client.KuduTable +import org.apache.kudu.client.MiniKuduCluster import org.apache.kudu.junit.RetryRule -import org.apache.kudu.{Schema, Type} +import org.apache.kudu.Schema +import org.apache.kudu.Type import org.apache.kudu.util.DecimalUtil import org.apache.spark.sql.SparkSession -import org.junit.{After, Before, Rule} +import org.junit.After +import org.junit.Before +import org.junit.Rule import org.scalatest.junit.JUnitSuite // TODO (grant): Use BaseKuduTest for most of this. @@ -56,21 +62,32 @@ trait KuduTestSuite extends JUnitSuite { new ColumnSchemaBuilder("c6_short", Type.INT16).build(), new ColumnSchemaBuilder("c7_float", Type.FLOAT).build(), new ColumnSchemaBuilder("c8_binary", Type.BINARY).build(), - new ColumnSchemaBuilder("c9_unixtime_micros", Type.UNIXTIME_MICROS).build(), + new ColumnSchemaBuilder("c9_unixtime_micros", Type.UNIXTIME_MICROS) + .build(), new ColumnSchemaBuilder("c10_byte", Type.INT8).build(), new ColumnSchemaBuilder("c11_decimal32", Type.DECIMAL) .typeAttributes( - new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL32_PRECISION).build() - ).build(), + new ColumnTypeAttributesBuilder() + .precision(DecimalUtil.MAX_DECIMAL32_PRECISION) + .build() + ) + .build(), new ColumnSchemaBuilder("c12_decimal64", Type.DECIMAL) .typeAttributes( - new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL64_PRECISION).build() - ).build(), + new ColumnTypeAttributesBuilder() + .precision(DecimalUtil.MAX_DECIMAL64_PRECISION) + .build() + ) + .build(), new ColumnSchemaBuilder("c13_decimal128", Type.DECIMAL) .typeAttributes( - new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL128_PRECISION).build() - ).build()) - new Schema(columns.asJava) + new ColumnTypeAttributesBuilder() + .precision(DecimalUtil.MAX_DECIMAL128_PRECISION) + .build() + ) + .build() + ) + new Schema(columns.asJava) } lazy val simpleSchema: Schema = { @@ -93,13 +110,16 @@ trait KuduTestSuite extends JUnitSuite { .setNumReplicas(1) } - val appID: String = new Date().toString + math.floor(math.random * 10E4).toLong.toString + val appID: String = new Date().toString + math + .floor(math.random * 10E4) + .toLong + .toString - val conf: SparkConf = new SparkConf(). - setMaster("local[*]"). - setAppName("test"). - set("spark.ui.enabled", "false"). - set("spark.app.id", appID) + val conf: SparkConf = new SparkConf() + .setMaster("local[*]") + .setAppName("test") + .set("spark.ui.enabled", "false") + .set("spark.app.id", appID) // Add a rule to rerun tests. We use this with Gradle because it doesn't support // Surefire/Failsafe rerunFailingTestsCount like Maven does. @@ -121,7 +141,6 @@ trait KuduTestSuite extends JUnitSuite { table = kuduClient.createTable(tableName, schema, tableOptions) - val simpleTableOptions = new CreateTableOptions() .setRangePartitionColumns(List("key").asJava) .setNumReplicas(1) @@ -143,7 +162,9 @@ trait KuduTestSuite extends JUnitSuite { kuduSession.apply(delete) } - def insertRows(targetTable: KuduTable, rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = { + def insertRows( + targetTable: KuduTable, + rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = { val kuduSession = kuduClient.newSession() val rows = Range(0, rowCount).map { i => @@ -153,7 +174,7 @@ trait KuduTestSuite extends JUnitSuite { row.addInt(1, i) row.addDouble(3, i.toDouble) row.addLong(4, i.toLong) - row.addBoolean(5, i%2==1) + row.addBoolean(5, i % 2 == 1) row.addShort(6, i.toShort) row.addFloat(7, i.toFloat) row.addBinary(8, s"bytes $i".getBytes()) http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/pom.xml ---------------------------------------------------------------------- diff --git a/java/pom.xml b/java/pom.xml index 8aec568..2373dbe 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -62,8 +62,7 @@ <maven-shade-plugin.version>3.1.1</maven-shade-plugin.version> <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version> <scala-maven-plugin.version>3.3.2</scala-maven-plugin.version> - <mvn-scalafmt-plugin.version>0.7</mvn-scalafmt-plugin.version> - <scalafmt.version>1.4.0</scalafmt.version> + <mvn-scalafmt-plugin.version>0.7_1.4.0</mvn-scalafmt-plugin.version> <!-- Library dependencies --> <async.version>1.4.1</async.version> @@ -184,10 +183,9 @@ <plugin> <groupId>org.antipathy</groupId> <artifactId>mvn-scalafmt</artifactId> - <version>${mvn-scalafmt-plugin.version}_${scalafmt.version}</version> + <version>${mvn-scalafmt-plugin.version}</version> <configuration> - <parameters>--diff</parameters> - <configLocation>${user.dir}/.scalafmt.conf</configLocation> + <configLocation>.scalafmt.conf</configLocation> </configuration> <executions> <execution>