This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit b2039d515bba83fa874f6d5ee7ad2bfc56e1e267 Author: Grant Henke <[email protected]> AuthorDate: Mon Oct 12 10:08:43 2020 -0500 KUDU-3202: [build] Add Spark 3 Support This patch adds Sparks 3 support to the build and builds using Spark 3 by default while maintiaining the ability to build Spark 2. One simple change to the backup job was required to maintain maximum backwards compatibility due to SPARK-31404. Additionally a few small changes were needed to fix SpotBugs issues. Follow on changes will adjust build scripts and publishing docs to publish both Spark 2 and 3 Jars. Change-Id: I60d5afb1ce65d5bc7547f18ea3382bed6c71883f Reviewed-on: http://gerrit.cloudera.org:8080/16582 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- java/config/spotbugs/excludeFilter.xml | 20 ++++++++++++++++++++ java/gradle/dependencies.gradle | 13 +++++++++++-- .../scala/org/apache/kudu/backup/BackupUtils.scala | 8 ++++---- .../scala/org/apache/kudu/backup/KuduBackup.scala | 5 +++++ .../org/apache/kudu/backup/TestKuduBackup.scala | 2 +- .../kudu/spark/tools/TestImportExportFiles.scala | 2 +- 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/java/config/spotbugs/excludeFilter.xml b/java/config/spotbugs/excludeFilter.xml index 0a7c55f..64e67cb 100644 --- a/java/config/spotbugs/excludeFilter.xml +++ b/java/config/spotbugs/excludeFilter.xml @@ -83,6 +83,8 @@ <Bug pattern="SE_NO_SERIALVERSIONID"/> <!-- HE_HASHCODE_USE_OBJECT_EQUALS: Class defines hashCode() and uses Object.equals(). --> <Bug pattern="HE_HASHCODE_USE_OBJECT_EQUALS"/> + <!-- MS_PKGPROTECT: Field should be package protected --> + <Bug pattern="MS_PKGPROTECT"/> </Or> </Match> <Match> @@ -143,6 +145,15 @@ </Or> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" /> </Match> + <Match> + <!-- The casts in these methods are known and safe. --> + <Class name="org.apache.kudu.backup.KuduBackupRDD"/> + <Or> + <Method name="compute" /> + <Method name="getPreferredLocations" /> + </Or> + <Bug pattern="BC_UNCONFIRMED_CAST" /> + </Match> <!-- kudu-client exclusions --> <Match> @@ -293,6 +304,15 @@ <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" /> </Match> <Match> + <!-- The casts in these methods are known and safe. --> + <Class name="org.apache.kudu.spark.kudu.KuduRDD"/> + <Or> + <Method name="compute" /> + <Method name="getPreferredLocations" /> + </Or> + <Bug pattern="BC_UNCONFIRMED_CAST" /> + </Match> + <Match> <!-- These fields don't need to be restored in this case. --> <Class name="org.apache.kudu.spark.kudu.KuduContext"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" /> diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle index 37fb170..1434903 100755 --- a/java/gradle/dependencies.gradle +++ b/java/gradle/dependencies.gradle @@ -54,11 +54,13 @@ versions += [ parquet : "1.11.1", protobuf : "3.13.0", ranger : "2.1.0", - scala : "2.11.12", + scala211 : "2.11.12", + scala : "2.12.10", scalatest : "3.2.2", scopt : "3.7.1", slf4j : "1.7.30", - spark : "2.4.7", + spark2 : "2.4.7", + spark : "3.0.1", spotBugs : "4.1.1", yetus : "0.12.0" ] @@ -68,6 +70,13 @@ if (gradle.gradleVersion != versions.gradle) { println "Using gradle version $gradle.gradleVersion (Build defines $versions.gradle)" } +// If the `spark2` property is passed, override the `spark` and `scala` version +// to use the `spark2` and `scala211` versions. +if (propertyExists("spark2")) { + versions["spark"] = "$versions.spark2" + versions["scala"] = "$versions.scala211" +} + // Add base Scala version versions["scalaBase"] = versions.scala.substring(0, versions.scala.lastIndexOf(".")) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala index 3224fa9..7701134 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/BackupUtils.scala @@ -43,12 +43,12 @@ object BackupUtils { * The column name can vary because it's accessed positionally. */ private def generateRowActionColumn(schema: Schema): StructField = { - var columnName = "backup_row_action" + val columnName = new StringBuffer("backup_row_action") // If the column already exists and we need to pick an alternate column name. - while (schema.hasColumn(columnName)) { - columnName += "_" + while (schema.hasColumn(columnName.toString)) { + columnName.append("_") } - StructField(columnName, ByteType) + StructField(columnName.toString, ByteType) } } diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala index 58e7edb..cddcc40 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala @@ -83,6 +83,11 @@ object KuduBackup { session.sqlContext .createDataFrame(rdd, BackupUtils.dataSchema(table.getSchema, incremental)) + // Ensure maximum compatibility for dates before 1582-10-15 or timestamps before + // 1900-01-01T00:00:00Z in Parquet. Otherwise incorrect values may be read by + // Spark 2 or legacy version of Hive. See more details in SPARK-31404. + session.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") + // Write the data to the backup path. // The backup path contains the timestampMs and should not already exist. val writer = df.write.mode(SaveMode.ErrorIfExists) diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 9e8e6f2..da8fe58 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -515,7 +515,7 @@ class TestKuduBackup extends KuduTestSuite { .addRangePartition(ten, twenty) .addRangePartition(twenty, thirty) .addRangePartition(thirty, fourty) - var table = kuduClient.createTable(tableName, schema, options) + val table = kuduClient.createTable(tableName, schema, options) // Fill the partitions with rows. insertRows(table, 30, 10) diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala index 8629a14..7c5b5e8 100644 --- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala @@ -96,7 +96,7 @@ class TestImportExportFiles extends KuduTestSuite { // TODO(KUDU-2454): Use random schemas and random data to ensure all type/values round-trip. private def loadSampleData(table: KuduTable, numRows: Int): Unit = { val session = kuduClient.newSession() - val rows = Range(0, numRows).map { i => + Range(0, numRows).map { i => val insert = table.newInsert val row = insert.getRow row.addString(0, i.toString)
