This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 759f04c7ec868bb1cec4a196e47cf988af8875fd Author: Grant Henke <[email protected]> AuthorDate: Thu Feb 6 15:05:16 2020 -0600 KUDU-3049: [spark] Automatic handling of schema drift This patch adds a new write option `kudu.handleSchemaDrift`. If set to true, when new fields are encountered the Kudu table will be altered to include new columns for those fields. Change-Id: Ib1edebb293d6ae79c26a0ecb9ce7755308f667f4 Reviewed-on: http://gerrit.cloudera.org:8080/15176 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- .../org/apache/kudu/client/TestAlterTable.java | 41 ++++++++++++++ .../org/apache/kudu/client/TestKuduClient.java | 63 ++++++++++++++++++++++ .../org/apache/kudu/spark/kudu/DefaultSource.scala | 10 +++- .../org/apache/kudu/spark/kudu/KuduContext.scala | 26 +++++++++ .../apache/kudu/spark/kudu/KuduWriteOptions.scala | 7 ++- .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 59 ++++++++++++++++++++ 6 files changed, 204 insertions(+), 2 deletions(-) diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java index d1f5cb0..715ced4 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java @@ -35,9 +35,11 @@ import java.util.Map; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.function.ThrowingRunnable; import org.apache.kudu.ColumnSchema; import org.apache.kudu.ColumnSchema.CompressionAlgorithm; @@ -156,6 +158,18 @@ public class TestAlterTable { ", INT32 addNullable=101, INT32 addNullableDef=NULL"); Collections.sort(expected); assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + + NonRecoverableException thrown = + Assert.assertThrows(NonRecoverableException.class, new ThrowingRunnable() { + @Override + public void run() throws Exception { + // Add duplicate column + client.alterTable(tableName, new AlterTableOptions() + .addNullableColumn("addNullable", Type.INT32)); + } + }); + Assert.assertTrue(thrown.getStatus().isAlreadyPresent()); + Assert.assertTrue(thrown.getMessage().contains("The column already exists")); } @Test @@ -523,4 +537,31 @@ public class TestAlterTable { table = client.openTable(tableName); assertTrue(table.getExtraConfig().isEmpty()); } + + @Test + @KuduTestHarness.MasterServerConfig(flags = { "--max_num_columns=10" }) + public void testAlterExceedsColumnLimit() throws Exception { + ArrayList<ColumnSchema> columns = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + columns.add(new ColumnSchema.ColumnSchemaBuilder(Integer.toString(i), Type.INT32) + .key(i == 0) + .build()); + } + Schema schema = new Schema(columns); + CreateTableOptions createOptions = + new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("0")); + client.createTable(tableName, schema, createOptions); + + NonRecoverableException thrown = + Assert.assertThrows(NonRecoverableException.class, new ThrowingRunnable() { + @Override + public void run() throws Exception { + client.alterTable(tableName, + new AlterTableOptions().addNullableColumn("11", Type.INT32)); + } + }); + Assert.assertTrue(thrown.getStatus().isInvalidArgument()); + Assert.assertTrue(thrown.getMessage() + .contains("number of columns 11 is greater than the permitted maximum 10")); + } } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 1fc7d99..4a7a473 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -1344,4 +1344,67 @@ public class TestKuduClient { assertTrue("Missing warning:\n" + loggedText, loggedText.contains("this is unsafe")); } + + @Test(timeout = 100000) + public void testSchemaDriftPattern() throws Exception { + KuduTable table = client.createTable( + TABLE_NAME, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false)); + KuduSession session = client.newSession(); + + // Insert a row. + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("key", "key_0"); + row.addString("c1", "c1_0"); + row.addString("c2", "c2_0"); + row.addString("c3", "c3_0"); + row.addString("c4", "c4_0"); + OperationResponse resp = session.apply(insert); + assertFalse(resp.hasRowError()); + + // Insert a row with an extra column. + boolean retried = false; + while (true) { + try { + Insert insertExtra = table.newInsert(); + PartialRow rowExtra = insertExtra.getRow(); + rowExtra.addString("key", "key_1"); + rowExtra.addString("c1", "c1_1"); + rowExtra.addString("c2", "c2_1"); + rowExtra.addString("c3", "c2_1"); + rowExtra.addString("c4", "c2_1"); + rowExtra.addString("c5", "c5_1"); + OperationResponse respExtra = session.apply(insertExtra); + assertFalse(respExtra.hasRowError()); + break; + } catch (IllegalArgumentException e) { + if (retried) { + throw e; + } + // Add the missing column and retry. + if (e.getMessage().contains("Unknown column")) { + client.alterTable(TABLE_NAME, new AlterTableOptions() + .addNullableColumn("c5", Type.STRING)); + // We need to re-open the table to ensure it has the new schema. + table = client.openTable(TABLE_NAME); + retried = true; + } else { + throw e; + } + } + } + // Make sure we actually retried. + assertTrue(retried); + + // Insert a row with the old schema. + Insert insertOld = table.newInsert(); + PartialRow rowOld = insertOld.getRow(); + rowOld.addString("key", "key_3"); + rowOld.addString("c1", "c1_3"); + rowOld.addString("c2", "c2_3"); + rowOld.addString("c3", "c3_3"); + rowOld.addString("c4", "c4_3"); + OperationResponse respOld = session.apply(insertOld); + assertFalse(respOld.hasRowError()); + } } diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index 7852906..9bee9f6 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -69,6 +69,7 @@ class DefaultSource val BATCH_SIZE = "kudu.batchSize" val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs" val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes" + val HANDLE_SCHEMA_DRIFT = "kudu.handleSchemaDrift" /** * A nice alias for the data source so that when specifying the format @@ -203,7 +204,14 @@ class DefaultSource parameters.get(REPARTITION).map(_.toBoolean).getOrElse(defaultRepartition) val repartitionSort = parameters.get(REPARTITION_SORT).map(_.toBoolean).getOrElse(defaultRepartitionSort) - KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull, repartition, repartitionSort) + val handleSchemaDrift = + parameters.get(HANDLE_SCHEMA_DRIFT).map(_.toBoolean).getOrElse(defaultHandleSchemaDrift) + KuduWriteOptions( + ignoreDuplicateRowErrors, + ignoreNull, + repartition, + repartitionSort, + handleSchemaDrift) } private def getMasterAddrs(parameters: Map[String, String]): String = { diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index 6cf0ace..85f1416 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -45,6 +45,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.apache.kudu.client.SessionConfiguration.FlushMode import org.apache.kudu.client._ +import org.apache.kudu.spark.kudu.SparkUtil.kuduSchema import org.apache.kudu.spark.kudu.SparkUtil._ import org.apache.kudu.Schema import org.apache.kudu.Type @@ -336,6 +337,31 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou // Get the client's last propagated timestamp on the driver. val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp + if (writeOptions.handleSchemaDrift) { + val kuduSchema = syncClient.openTable(tableName).getSchema + val newColumns = schema.fields.filter(f => !kuduSchema.hasColumn(f.name)) + if (!newColumns.isEmpty) { + log.info( + s"adding ${newColumns.length} columns to table '$tableName' to handle schema drift") + val alter = new AlterTableOptions() + newColumns.foreach { col => + alter.addNullableColumn(col.name, sparkTypeToKuduType(col.dataType)) + } + try { + syncClient.alterTable(tableName, alter) + } catch { + case e: KuduException => + // Ignore the exception if the column already exists due to concurrent + // applications attempting to handle schema drift. + if (e.getStatus.isAlreadyPresent) { + log.info(s"column already exists in table '$tableName' while handling schema drift") + } else { + throw e + } + } + } + } + // Convert to an RDD and map the InternalRows to Rows. // This avoids any corruption as reported in SPARK-26880. var rdd = data.queryExecution.toRdd.mapPartitions { rows => diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala index abe491c..01aa3d1 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala @@ -32,6 +32,9 @@ import org.apache.kudu.spark.kudu.KuduWriteOptions._ * partitioning of the target Kudu table * @param repartitionSort if set to true, the data will also be sorted while being * repartitioned. This is only used if repartition is true. + * @param handleSchemaDrift if set to true, when fields with names that are not in + * the target Kudu table are encountered, the Kudu table + * will be altered to include new columns for those fields. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -39,11 +42,13 @@ case class KuduWriteOptions( ignoreDuplicateRowErrors: Boolean = defaultIgnoreDuplicateRowErrors, ignoreNull: Boolean = defaultIgnoreNull, repartition: Boolean = defaultRepartition, - repartitionSort: Boolean = defaultRepartitionSort) + repartitionSort: Boolean = defaultRepartitionSort, + handleSchemaDrift: Boolean = defaultHandleSchemaDrift) object KuduWriteOptions { val defaultIgnoreDuplicateRowErrors: Boolean = false val defaultIgnoreNull: Boolean = false val defaultRepartition: Boolean = false val defaultRepartitionSort: Boolean = true + val defaultHandleSchemaDrift: Boolean = false } diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index ebd11b8..c6baf83 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -17,8 +17,11 @@ package org.apache.kudu.spark.kudu +import java.nio.charset.StandardCharsets + import scala.collection.JavaConverters._ import scala.collection.immutable.IndexedSeq +import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SaveMode @@ -558,6 +561,62 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { } @Test + def testSchemaDrift() { + val nonNullDF = + sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") + kuduContext.insertRows(nonNullDF, simpleTableName) + + val tableOptions = Map( + "kudu.master" -> harness.getMasterAddressesAsString, + "kudu.table" -> simpleTableName + ) + val df = sqlContext.read.options(tableOptions).format("kudu").load + assertEquals(2, df.schema.fields.length) + + // Add a column not in the table schema by duplicating the val column. + val newDf = df.withColumn("val2", col("val")) + + // Insert with handleSchemaDrift = false. Note that a new column was not created. + kuduContext.upsertRows(newDf, simpleTableName, KuduWriteOptions(handleSchemaDrift = false)) + assertEquals(2, harness.getClient.openTable(simpleTableName).getSchema.getColumns.size()) + + // Insert with handleSchemaDrift = true. Note that a new column was created. + kuduContext.upsertRows(newDf, simpleTableName, KuduWriteOptions(handleSchemaDrift = true)) + assertEquals(3, harness.getClient.openTable(simpleTableName).getSchema.getColumns.size()) + + val afterDf = sqlContext.read.options(tableOptions).format("kudu").load + assertEquals(3, afterDf.schema.fields.length) + assertEquals("val2", afterDf.schema.fieldNames.last) + assertTrue(afterDf.collect().forall(r => r.getString(1) == r.getString(2))) + } + + @Test + def testInsertWrongType() { + val nonNullDF = + sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") + kuduContext.insertRows(nonNullDF, simpleTableName) + + val tableOptions = Map( + "kudu.master" -> harness.getMasterAddressesAsString, + "kudu.table" -> simpleTableName + ) + val df = sqlContext.read.options(tableOptions).format("kudu").load + // Convert the val column to a bytes instead of string. + val toBytes = udf[Array[Byte], String](_.getBytes(StandardCharsets.UTF_8)) + val newDf = df + .withColumn("valTmp", toBytes(col("val"))) + .drop("val") + .withColumnRenamed("valTmp", "val") + + try { + kuduContext.insertRows(newDf, simpleTableName, KuduWriteOptions()) + } catch { + case e: SparkException => + assertTrue(e.getMessage.contains("val isn't [Type: binary], it's string")) + } + } + + @Test def testCreateRelationWithSchema() { // user-supplied schema that is compatible with actual schema, but with the key at the end val userSchema: StructType = StructType(
