This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 759f04c7ec868bb1cec4a196e47cf988af8875fd
Author: Grant Henke <[email protected]>
AuthorDate: Thu Feb 6 15:05:16 2020 -0600

    KUDU-3049: [spark] Automatic handling of schema drift
    
    This patch adds a new write option `kudu.handleSchemaDrift`.
    If set to true, when new fields are encountered the Kudu table
    will be altered to include new columns for those fields.
    
    Change-Id: Ib1edebb293d6ae79c26a0ecb9ce7755308f667f4
    Reviewed-on: http://gerrit.cloudera.org:8080/15176
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../org/apache/kudu/client/TestAlterTable.java     | 41 ++++++++++++++
 .../org/apache/kudu/client/TestKuduClient.java     | 63 ++++++++++++++++++++++
 .../org/apache/kudu/spark/kudu/DefaultSource.scala | 10 +++-
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 26 +++++++++
 .../apache/kudu/spark/kudu/KuduWriteOptions.scala  |  7 ++-
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 59 ++++++++++++++++++++
 6 files changed, 204 insertions(+), 2 deletions(-)

diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
index d1f5cb0..715ced4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
@@ -35,9 +35,11 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.function.ThrowingRunnable;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
@@ -156,6 +158,18 @@ public class TestAlterTable {
         ", INT32 addNullable=101, INT32 addNullableDef=NULL");
     Collections.sort(expected);
     assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new 
String[0]));
+
+    NonRecoverableException thrown =
+            Assert.assertThrows(NonRecoverableException.class, new 
ThrowingRunnable() {
+              @Override
+              public void run() throws Exception {
+                // Add duplicate column
+                client.alterTable(tableName, new AlterTableOptions()
+                        .addNullableColumn("addNullable", Type.INT32));
+              }
+            });
+    Assert.assertTrue(thrown.getStatus().isAlreadyPresent());
+    Assert.assertTrue(thrown.getMessage().contains("The column already 
exists"));
   }
 
   @Test
@@ -523,4 +537,31 @@ public class TestAlterTable {
     table = client.openTable(tableName);
     assertTrue(table.getExtraConfig().isEmpty());
   }
+
+  @Test
+  @KuduTestHarness.MasterServerConfig(flags = { "--max_num_columns=10" })
+  public void testAlterExceedsColumnLimit() throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      columns.add(new ColumnSchema.ColumnSchemaBuilder(Integer.toString(i), 
Type.INT32)
+              .key(i == 0)
+              .build());
+    }
+    Schema schema = new Schema(columns);
+    CreateTableOptions createOptions =
+            new 
CreateTableOptions().setRangePartitionColumns(ImmutableList.of("0"));
+    client.createTable(tableName, schema, createOptions);
+
+    NonRecoverableException thrown =
+            Assert.assertThrows(NonRecoverableException.class, new 
ThrowingRunnable() {
+              @Override
+              public void run() throws Exception {
+                client.alterTable(tableName,
+                        new AlterTableOptions().addNullableColumn("11", 
Type.INT32));
+              }
+            });
+    Assert.assertTrue(thrown.getStatus().isInvalidArgument());
+    Assert.assertTrue(thrown.getMessage()
+            .contains("number of columns 11 is greater than the permitted 
maximum 10"));
+  }
 }
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 1fc7d99..4a7a473 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -1344,4 +1344,67 @@ public class TestKuduClient {
     assertTrue("Missing warning:\n" + loggedText,
                loggedText.contains("this is unsafe"));
   }
+
+  @Test(timeout = 100000)
+  public void testSchemaDriftPattern() throws Exception {
+    KuduTable table = client.createTable(
+            TABLE_NAME, createManyStringsSchema(), 
getBasicCreateTableOptions().setWait(false));
+    KuduSession session = client.newSession();
+
+    // Insert a row.
+    Insert insert = table.newInsert();
+    PartialRow row = insert.getRow();
+    row.addString("key", "key_0");
+    row.addString("c1", "c1_0");
+    row.addString("c2", "c2_0");
+    row.addString("c3", "c3_0");
+    row.addString("c4", "c4_0");
+    OperationResponse resp = session.apply(insert);
+    assertFalse(resp.hasRowError());
+
+    // Insert a row with an extra column.
+    boolean retried = false;
+    while (true) {
+      try {
+        Insert insertExtra = table.newInsert();
+        PartialRow rowExtra = insertExtra.getRow();
+        rowExtra.addString("key", "key_1");
+        rowExtra.addString("c1", "c1_1");
+        rowExtra.addString("c2", "c2_1");
+        rowExtra.addString("c3", "c2_1");
+        rowExtra.addString("c4", "c2_1");
+        rowExtra.addString("c5", "c5_1");
+        OperationResponse respExtra = session.apply(insertExtra);
+        assertFalse(respExtra.hasRowError());
+        break;
+      } catch (IllegalArgumentException e) {
+        if (retried) {
+          throw e;
+        }
+        // Add the missing column and retry.
+        if (e.getMessage().contains("Unknown column")) {
+          client.alterTable(TABLE_NAME, new AlterTableOptions()
+                  .addNullableColumn("c5", Type.STRING));
+          // We need to re-open the table to ensure it has the new schema.
+          table = client.openTable(TABLE_NAME);
+          retried = true;
+        } else {
+          throw e;
+        }
+      }
+    }
+    // Make sure we actually retried.
+    assertTrue(retried);
+
+    // Insert a row with the old schema.
+    Insert insertOld = table.newInsert();
+    PartialRow rowOld = insertOld.getRow();
+    rowOld.addString("key", "key_3");
+    rowOld.addString("c1", "c1_3");
+    rowOld.addString("c2", "c2_3");
+    rowOld.addString("c3", "c3_3");
+    rowOld.addString("c4", "c4_3");
+    OperationResponse respOld = session.apply(insertOld);
+    assertFalse(respOld.hasRowError());
+  }
 }
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 7852906..9bee9f6 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -69,6 +69,7 @@ class DefaultSource
   val BATCH_SIZE = "kudu.batchSize"
   val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
   val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
+  val HANDLE_SCHEMA_DRIFT = "kudu.handleSchemaDrift"
 
   /**
    * A nice alias for the data source so that when specifying the format
@@ -203,7 +204,14 @@ class DefaultSource
       
parameters.get(REPARTITION).map(_.toBoolean).getOrElse(defaultRepartition)
     val repartitionSort =
       
parameters.get(REPARTITION_SORT).map(_.toBoolean).getOrElse(defaultRepartitionSort)
-    KuduWriteOptions(ignoreDuplicateRowErrors, ignoreNull, repartition, 
repartitionSort)
+    val handleSchemaDrift =
+      
parameters.get(HANDLE_SCHEMA_DRIFT).map(_.toBoolean).getOrElse(defaultHandleSchemaDrift)
+    KuduWriteOptions(
+      ignoreDuplicateRowErrors,
+      ignoreNull,
+      repartition,
+      repartitionSort,
+      handleSchemaDrift)
   }
 
   private def getMasterAddrs(parameters: Map[String, String]): String = {
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 6cf0ace..85f1416 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -45,6 +45,7 @@ import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu.SparkUtil.kuduSchema
 import org.apache.kudu.spark.kudu.SparkUtil._
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
@@ -336,6 +337,31 @@ class KuduContext(val kuduMaster: String, sc: 
SparkContext, val socketReadTimeou
     // Get the client's last propagated timestamp on the driver.
     val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp
 
+    if (writeOptions.handleSchemaDrift) {
+      val kuduSchema = syncClient.openTable(tableName).getSchema
+      val newColumns = schema.fields.filter(f => !kuduSchema.hasColumn(f.name))
+      if (!newColumns.isEmpty) {
+        log.info(
+          s"adding ${newColumns.length} columns to table '$tableName' to 
handle schema drift")
+        val alter = new AlterTableOptions()
+        newColumns.foreach { col =>
+          alter.addNullableColumn(col.name, sparkTypeToKuduType(col.dataType))
+        }
+        try {
+          syncClient.alterTable(tableName, alter)
+        } catch {
+          case e: KuduException =>
+            // Ignore the exception if the column already exists due to 
concurrent
+            // applications attempting to handle schema drift.
+            if (e.getStatus.isAlreadyPresent) {
+              log.info(s"column already exists in table '$tableName' while 
handling schema drift")
+            } else {
+              throw e
+            }
+        }
+      }
+    }
+
     // Convert to an RDD and map the InternalRows to Rows.
     // This avoids any corruption as reported in SPARK-26880.
     var rdd = data.queryExecution.toRdd.mapPartitions { rows =>
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
index abe491c..01aa3d1 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
@@ -32,6 +32,9 @@ import org.apache.kudu.spark.kudu.KuduWriteOptions._
  *                   partitioning of the target Kudu table
  * @param repartitionSort if set to true, the data will also be sorted while 
being
  *                   repartitioned. This is only used if repartition is true.
+ * @param handleSchemaDrift if set to true, when fields with names that are 
not in
+ *                          the target Kudu table are encountered, the Kudu 
table
+ *                          will be altered to include new columns for those 
fields.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -39,11 +42,13 @@ case class KuduWriteOptions(
     ignoreDuplicateRowErrors: Boolean = defaultIgnoreDuplicateRowErrors,
     ignoreNull: Boolean = defaultIgnoreNull,
     repartition: Boolean = defaultRepartition,
-    repartitionSort: Boolean = defaultRepartitionSort)
+    repartitionSort: Boolean = defaultRepartitionSort,
+    handleSchemaDrift: Boolean = defaultHandleSchemaDrift)
 
 object KuduWriteOptions {
   val defaultIgnoreDuplicateRowErrors: Boolean = false
   val defaultIgnoreNull: Boolean = false
   val defaultRepartition: Boolean = false
   val defaultRepartitionSort: Boolean = true
+  val defaultHandleSchemaDrift: Boolean = false
 }
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index ebd11b8..c6baf83 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -17,8 +17,11 @@
 
 package org.apache.kudu.spark.kudu
 
+import java.nio.charset.StandardCharsets
+
 import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
+import org.apache.spark.SparkException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.SaveMode
@@ -558,6 +561,62 @@ class DefaultSourceTest extends KuduTestSuite with 
Matchers {
   }
 
   @Test
+  def testSchemaDrift() {
+    val nonNullDF =
+      sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
+    kuduContext.insertRows(nonNullDF, simpleTableName)
+
+    val tableOptions = Map(
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.table" -> simpleTableName
+    )
+    val df = sqlContext.read.options(tableOptions).format("kudu").load
+    assertEquals(2, df.schema.fields.length)
+
+    // Add a column not in the table schema by duplicating the val column.
+    val newDf = df.withColumn("val2", col("val"))
+
+    // Insert with handleSchemaDrift = false. Note that a new column was not 
created.
+    kuduContext.upsertRows(newDf, simpleTableName, 
KuduWriteOptions(handleSchemaDrift = false))
+    assertEquals(2, 
harness.getClient.openTable(simpleTableName).getSchema.getColumns.size())
+
+    // Insert with handleSchemaDrift = true. Note that a new column was 
created.
+    kuduContext.upsertRows(newDf, simpleTableName, 
KuduWriteOptions(handleSchemaDrift = true))
+    assertEquals(3, 
harness.getClient.openTable(simpleTableName).getSchema.getColumns.size())
+
+    val afterDf = sqlContext.read.options(tableOptions).format("kudu").load
+    assertEquals(3, afterDf.schema.fields.length)
+    assertEquals("val2", afterDf.schema.fieldNames.last)
+    assertTrue(afterDf.collect().forall(r => r.getString(1) == r.getString(2)))
+  }
+
+  @Test
+  def testInsertWrongType() {
+    val nonNullDF =
+      sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
+    kuduContext.insertRows(nonNullDF, simpleTableName)
+
+    val tableOptions = Map(
+      "kudu.master" -> harness.getMasterAddressesAsString,
+      "kudu.table" -> simpleTableName
+    )
+    val df = sqlContext.read.options(tableOptions).format("kudu").load
+    // Convert the val column to a bytes instead of string.
+    val toBytes = udf[Array[Byte], String](_.getBytes(StandardCharsets.UTF_8))
+    val newDf = df
+      .withColumn("valTmp", toBytes(col("val")))
+      .drop("val")
+      .withColumnRenamed("valTmp", "val")
+
+    try {
+      kuduContext.insertRows(newDf, simpleTableName, KuduWriteOptions())
+    } catch {
+      case e: SparkException =>
+        assertTrue(e.getMessage.contains("val isn't [Type: binary], it's 
string"))
+    }
+  }
+
+  @Test
   def testCreateRelationWithSchema() {
     // user-supplied schema that is compatible with actual schema, but with 
the key at the end
     val userSchema: StructType = StructType(

Reply via email to