This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch branch-1.10.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.10.x by this push:
     new a110653  [java] Favor column ids over column names in scan tokens
a110653 is described below

commit a1106532113b7e9951a5b0632e7ebdc3ad19c613
Author: Will Berkeley <[email protected]>
AuthorDate: Thu Jun 6 14:24:22 2019 -0700

    [java] Favor column ids over column names in scan tokens
    
    Previously, a scan token would use column name to map a column in its
    projection to a column in the target table's current schema.
    Therefore, a scan token couldn't be used if a column were renamed
    between when the token is cut and when it is rehydrated into a scanner.
    This adjusts the Java client to prefer ids to names, to fix this
    behavior. Since this involves including column ids when serializing
    columns to PBs as part of scan tokens, but the server does not permit
    clients to send column ids in most cases, this patch adds a new
    serialization option that includes column ids.
    
    Note that this patch does not make _scanners_ resistant to column name
    changes. If a scanner is opened against a table and a column name
    changes on a replica before the scanner opens a server-side scanner on
    it, the scan will fail if the column is in the projection.
    
    A follow-up will add similar capability to the C++ client.
    
    Change-Id: Ib3f05a4175c7e7bfaec2cbd3586723e6de3823f0
    Reviewed-on: http://gerrit.cloudera.org:8080/13562
    Reviewed-by: Mike Percy <[email protected]>
    Tested-by: Kudu Jenkins
    (cherry picked from commit a97276f29697ca32f599053d0e0aaa6d1e521a63)
    Reviewed-on: http://gerrit.cloudera.org:8080/13646
    Reviewed-by: Grant Henke <[email protected]>
---
 .../org/apache/kudu/client/AlterTableOptions.java  |   6 +-
 .../org/apache/kudu/client/CreateTableRequest.java |   6 +-
 .../java/org/apache/kudu/client/KuduScanToken.java |  70 ++++++---
 .../java/org/apache/kudu/client/Operation.java     |   3 +-
 .../org/apache/kudu/client/ProtobufHelper.java     |  32 ++--
 .../org/apache/kudu/client/TestKeyEncoding.java    |   9 +-
 .../java/org/apache/kudu/client/TestScanToken.java | 170 ++++++++++++++++++---
 7 files changed, 229 insertions(+), 67 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
index abf5538..cb92c29 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
@@ -297,7 +297,8 @@ public class AlterTableOptions {
     step.setAddRangePartition(builder);
     if (!pb.hasSchema()) {
       pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(),
-          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
+          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+                     SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     }
     return this;
   }
@@ -357,7 +358,8 @@ public class AlterTableOptions {
     step.setDropRangePartition(builder);
     if (!pb.hasSchema()) {
       pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(),
-          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
+          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+                     SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     }
     return this;
   }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index c270f82..f5c5eb5 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -18,6 +18,7 @@
 package org.apache.kudu.client;
 
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.google.protobuf.Message;
@@ -25,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
+import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
 
@@ -57,7 +59,9 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> 
{
   @Override
   Message createRequestPB() {
     this.builder.setName(this.name);
-    this.builder.setSchema(ProtobufHelper.schemaToPb(this.schema));
+    this.builder.setSchema(
+        ProtobufHelper.schemaToPb(this.schema,
+                                  
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     return this.builder.build();
   }
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index adabf26..bfb0712 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -19,11 +19,11 @@ package org.apache.kudu.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.UnsafeByteOperations;
@@ -32,7 +32,9 @@ import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Common;
+import org.apache.kudu.Schema;
 import org.apache.kudu.client.Client.ScanTokenPB;
+import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.util.Pair;
 
 /**
@@ -159,6 +161,29 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
     return helper.toString();
   }
 
+  private static List<Integer> 
computeProjectedColumnIndexesForScanner(ScanTokenPB message,
+                                                                       Schema 
schema) {
+    List<Integer> columns = new 
ArrayList<>(message.getProjectedColumnsCount());
+    for (Common.ColumnSchemaPB colSchemaFromPb : 
message.getProjectedColumnsList()) {
+      int colIdx = colSchemaFromPb.hasId() && schema.hasColumnIds() ?
+          schema.getColumnIndex(colSchemaFromPb.getId()) :
+          schema.getColumnIndex(colSchemaFromPb.getName());
+      ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
+      if (colSchemaFromPb.getType() != 
colSchema.getType().getDataType(colSchema.getTypeAttributes())) {
+        throw new IllegalStateException(String.format(
+            "invalid type %s for column '%s' in scan token, expected: %s",
+            colSchemaFromPb.getType().name(), colSchemaFromPb.getName(), 
colSchema.getType().name()));
+      }
+      if (colSchemaFromPb.getIsNullable() != colSchema.isNullable()) {
+        throw new IllegalStateException(String.format(
+            "invalid nullability for column '%s' in scan token, expected: %s",
+            colSchemaFromPb.getName(), colSchema.isNullable() ? "NULLABLE" : 
"NOT NULL"));
+      }
+      columns.add(colIdx);
+    }
+    return columns;
+  }
+
   private static KuduScanner pbIntoScanner(ScanTokenPB message,
                                            KuduClient client) throws 
KuduException {
     Preconditions.checkArgument(
@@ -169,25 +194,9 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
                                              
client.openTable(message.getTableName());
     KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
 
-    List<Integer> columns = new 
ArrayList<>(message.getProjectedColumnsCount());
-    for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) {
-      int columnIdx = table.getSchema().getColumnIndex(column.getName());
-      ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx);
-      if (column.getType() != 
schema.getType().getDataType(schema.getTypeAttributes())) {
-        throw new IllegalStateException(String.format(
-            "invalid type %s for column '%s' in scan token, expected: %s",
-            column.getType().name(), column.getName(), 
schema.getType().name()));
-      }
-      if (column.getIsNullable() != schema.isNullable()) {
-        throw new IllegalStateException(String.format(
-            "invalid nullability for column '%s' in scan token, expected: %s",
-            column.getName(), column.getIsNullable() ? "NULLABLE" : "NOT 
NULL"));
-
-      }
 
-      columns.add(columnIdx);
-    }
-    builder.setProjectedColumnIndexes(columns);
+    builder.setProjectedColumnIndexes(
+        computeProjectedColumnIndexesForScanner(message, table.getSchema()));
 
     for (Common.ColumnPredicatePB pred : message.getColumnPredicatesList()) {
       builder.addPredicate(KuduPredicate.fromPB(table.getSchema(), pred));
@@ -355,21 +364,32 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
 
       // Map the column names or indices to actual columns in the table schema.
       // If the user did not set either projection, then scan all columns.
+      Schema schema = table.getSchema();
       if (projectedColumnNames != null) {
         for (String columnName : projectedColumnNames) {
-          ColumnSchema columnSchema = table.getSchema().getColumn(columnName);
+          ColumnSchema columnSchema = schema.getColumn(columnName);
           Preconditions.checkArgument(columnSchema != null, "unknown column 
i%s", columnName);
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), 
columnSchema);
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                                    schema.hasColumnIds() ? 
schema.getColumnId(columnName) : -1,
+                                    columnSchema);
         }
       } else if (projectedColumnIndexes != null) {
         for (int columnIdx : projectedColumnIndexes) {
-          ColumnSchema columnSchema = 
table.getSchema().getColumnByIndex(columnIdx);
+          ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx);
           Preconditions.checkArgument(columnSchema != null, "unknown column 
index %s", columnIdx);
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), 
columnSchema);
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                                    schema.hasColumnIds() ?
+                                        
schema.getColumnId(columnSchema.getName()) :
+                                        -1,
+                                    columnSchema);
         }
       } else {
-        for (ColumnSchema column : table.getSchema().getColumns()) {
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), 
column);
+        for (ColumnSchema column : schema.getColumns()) {
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                                    schema.hasColumnIds() ?
+                                        schema.getColumnId(column.getName()) :
+                                        -1,
+                                    column);
         }
       }
 
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index bb655f0..998692b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -290,7 +290,8 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
 
     Tserver.WriteRequestPB.Builder requestBuilder = 
Tserver.WriteRequestPB.newBuilder();
     requestBuilder.setSchema(ProtobufHelper.schemaToPb(schema,
-        EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
+        EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+                   SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     requestBuilder.setRowOperations(rowOps);
     return requestBuilder;
   }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index ed3263d..45ebe20 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -46,7 +46,8 @@ public class ProtobufHelper {
    * The flags that are not included while serializing.
    */
   public enum SchemaPBConversionFlags {
-    SCHEMA_PB_WITHOUT_COMMENT;
+    SCHEMA_PB_WITHOUT_COMMENT,
+    SCHEMA_PB_WITHOUT_ID
   }
 
   /**
@@ -58,13 +59,13 @@ public class ProtobufHelper {
     return schemaToListPb(schema, 
EnumSet.noneOf(SchemaPBConversionFlags.class));
   }
 
-  public static List<Common.ColumnSchemaPB> schemaToListPb(
-      Schema schema, EnumSet<SchemaPBConversionFlags> flags) {
-    ArrayList<Common.ColumnSchemaPB> columns =
-        new ArrayList<Common.ColumnSchemaPB>(schema.getColumnCount());
+  public static List<Common.ColumnSchemaPB> schemaToListPb(Schema schema,
+                                                           
EnumSet<SchemaPBConversionFlags> flags) {
+    ArrayList<Common.ColumnSchemaPB> columns = new 
ArrayList<>(schema.getColumnCount());
     Common.ColumnSchemaPB.Builder schemaBuilder = 
Common.ColumnSchemaPB.newBuilder();
     for (ColumnSchema col : schema.getColumns()) {
-      columns.add(columnToPb(schemaBuilder, col, flags));
+      int id = schema.hasColumnIds() ? schema.getColumnId(col.getName()) : -1;
+      columns.add(columnToPb(schemaBuilder, id, col, flags));
       schemaBuilder.clear();
     }
     return columns;
@@ -74,24 +75,30 @@ public class ProtobufHelper {
     return schemaToPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class));
   }
 
-  public static Common.SchemaPB schemaToPb(
-      Schema schema, EnumSet<SchemaPBConversionFlags> flags) {
+  public static Common.SchemaPB schemaToPb(Schema schema,
+                                           EnumSet<SchemaPBConversionFlags> 
flags) {
     Common.SchemaPB.Builder builder = Common.SchemaPB.newBuilder();
     builder.addAllColumns(schemaToListPb(schema, flags));
     return builder.build();
   }
 
   public static Common.ColumnSchemaPB columnToPb(ColumnSchema column) {
-    return columnToPb(Common.ColumnSchemaPB.newBuilder(), column);
+    return columnToPb(Common.ColumnSchemaPB.newBuilder(), -1, column);
   }
 
   public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder 
schemaBuilder,
+                                                 int colId,
                                                  ColumnSchema column) {
-    return columnToPb(schemaBuilder, column, 
EnumSet.noneOf(SchemaPBConversionFlags.class));
+    return columnToPb(schemaBuilder,
+                      colId,
+                      column,
+                      EnumSet.noneOf(SchemaPBConversionFlags.class));
   }
 
   public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder 
schemaBuilder,
-      ColumnSchema column, EnumSet<SchemaPBConversionFlags> flags) {
+                                                 int colId,
+                                                 ColumnSchema column,
+                                                 
EnumSet<SchemaPBConversionFlags> flags) {
     schemaBuilder
         .setName(column.getName())
         .setType(column.getWireType())
@@ -99,6 +106,9 @@ public class ProtobufHelper {
         .setIsNullable(column.isNullable())
         .setCfileBlockSize(column.getDesiredBlockSize());
 
+    if (!flags.contains(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID) && colId 
>= 0) {
+      schemaBuilder.setId(colId);
+    }
     if (column.getEncoding() != null) {
       schemaBuilder.setEncoding(column.getEncoding().getInternalPbType());
     }
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 4caf6dd..2b80d4a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
@@ -37,6 +38,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.client.PartitionSchema.RangeSchema;
+import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestKeyEncoding {
@@ -55,9 +57,10 @@ public class TestKeyEncoding {
     int i = 0;
     Common.SchemaPB.Builder pb = Common.SchemaPB.newBuilder();
     for (ColumnSchemaBuilder column : columns) {
-      Common.ColumnSchemaPB.Builder columnPb =
-          ProtobufHelper.columnToPb(column.build()).toBuilder();
-      columnPb.setId(i++);
+      Common.ColumnSchemaPB columnPb =
+          ProtobufHelper.columnToPb(Common.ColumnSchemaPB.newBuilder(),
+                                    i++,
+                                    column.build());
       pb.addColumns(columnPb);
     }
     return ProtobufHelper.pbToSchema(pb.build());
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 1d2da18..c203e95 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -215,41 +215,64 @@ public class TestScanToken {
       assertTrue(e.getMessage().contains("Unknown column"));
     }
 
-    // Add back the column with the wrong type.
+    // Add a column with the same name, type, and nullability. It will have a 
different id-- it's a
+    // different column-- so the scan token will fail.
     client.alterTable(
         testTableName,
-        new AlterTableOptions().addColumn(
-            new ColumnSchema.ColumnSchemaBuilder("a", 
Type.STRING).nullable(true).build()));
+        new AlterTableOptions()
+            .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
+                .nullable(false)
+                .defaultValue(0L).build()));
     try {
       token.intoScanner(client);
       fail();
-    } catch (IllegalStateException e) {
+    } catch (IllegalArgumentException e) {
       assertTrue(e.getMessage().contains(
-          "invalid type INT64 for column 'a' in scan token, expected: 
STRING"));
+          "Unknown column"));
     }
+  }
 
-    // Add the column with the wrong nullability.
-    client.alterTable(
-        testTableName,
-        new AlterTableOptions().dropColumn("a")
-                               .addColumn(new 
ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
-                                                          
.nullable(true).build()));
+  /**
+   * Tests that it is possible to create a scan token, rename a column, and 
rehydrate a scanner from
+   * the scan token with the old column name.
+   */
+  @Test
+  public void testScanTokensConcurrentColumnRename() throws Exception {
+    Schema schema = getBasicSchema();
+    String oldColName = schema.getColumnByIndex(1).getName();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    client.createTable(testTableName, schema, createOptions);
+
+    KuduTable table = client.openTable(testTableName);
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(1, tokens.size());
+    KuduScanToken token = tokens.get(0);
+
+    // Rename a column.
+    String newColName = "new-name";
+    client.alterTable(testTableName, new 
AlterTableOptions().renameColumn(oldColName, newColName));
+
+    KuduScanner scanner = token.intoScanner(client);
+
+    // TODO(wdberkeley): Handle renaming a column between when the token is 
rehydrated as a scanner
+    //  and when the scanner first hits a replica. Note that this is almost 
certainly a very
+    //  short period of vulnerability.
+
+    assertEquals(0, countRowsInScan(scanner));
+
+    // Test that the old name cannot be used and the new name can be.
+    Schema alteredSchema = scanner.getProjectionSchema();
     try {
-      token.intoScanner(client);
+      alteredSchema.getColumn(oldColName);
       fail();
-    } catch (IllegalStateException e) {
-      assertTrue(e.getMessage().contains(
-          "invalid nullability for column 'a' in scan token, expected: NOT 
NULL"));
+    } catch (IllegalArgumentException ex) {
+      // Good.
     }
-
-    // Add the column with the correct type and nullability.
-    client.alterTable(
-        testTableName,
-        new AlterTableOptions().dropColumn("a")
-                               .addColumn(new 
ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
-                                                          .nullable(false)
-                                                          
.defaultValue(0L).build()));
-    token.intoScanner(client);
+    alteredSchema.getColumn(newColName);
   }
 
   /**
@@ -357,4 +380,103 @@ public class TestScanToken {
       assertEquals(SCAN_REQUEST_TIMEOUT_MS, scanner.getScanRequestTimeout());
     }
   }
+
+  // Helper for scan token tests that use diff scan.
+  private long setupTableForDiffScans(KuduClient client,
+                                      KuduTable table,
+                                      int numRows) throws Exception {
+    KuduSession session = client.newSession();
+    for (int i = 0 ; i < numRows / 2; i++) {
+      session.apply(createBasicSchemaInsert(table, i));
+    }
+
+    // Grab the timestamp, then add more data so there's a diff.
+    long timestamp = client.getLastPropagatedTimestamp();
+    for (int i = numRows / 2; i < numRows; i++) {
+      session.apply(createBasicSchemaInsert(table, i));
+    }
+    // Delete some data so the is_deleted column can be tested.
+    for (int i = 0; i < numRows / 4; i++) {
+      Delete delete = table.newDelete();
+      PartialRow row = delete.getRow();
+      row.addInt(0, i);
+      session.apply(delete);
+    }
+
+    return timestamp;
+  }
+
+  // Helper to check diff scan results.
+  private void checkDiffScanResults(KuduScanner scanner,
+                                    int numExpectedMutations,
+                                    int numExpectedDeletes) throws 
KuduException {
+    int numMutations = 0;
+    int numDeletes = 0;
+    while (scanner.hasMoreRows()) {
+      for (RowResult rowResult : scanner.nextRows()) {
+        numMutations++;
+        if (rowResult.isDeleted()) numDeletes++;
+      }
+    }
+    assertEquals(numExpectedMutations, numMutations);
+    assertEquals(numExpectedDeletes, numDeletes);
+  }
+
+  /** Test that scan tokens work with diff scans. */
+  @Test
+  public void testDiffScanTokens() throws Exception {
+    Schema schema = getBasicSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    // Set up the table for a diff scan.
+    int numRows = 20;
+    long timestamp = setupTableForDiffScans(client, table, numRows);
+
+    // Since the diff scan interval is [start, end), increment the start 
timestamp to exclude
+    // the last row inserted in the first group of ops, and increment the end 
timestamp to include
+    // the last row deleted in the second group of ops.
+    List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1)
+        .build();
+    assertEquals(1, tokens.size());
+
+    checkDiffScanResults(tokens.get(0).intoScanner(client), 3 * numRows / 4, 
numRows / 4);
+  }
+
+  /** Test that scan tokens work with diff scans even when columns are 
renamed. */
+  @Test
+  public void testDiffScanTokensConcurrentColumnRename() throws Exception {
+    Schema schema = getBasicSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    // Set up the table for a diff scan.
+    int numRows = 20;
+    long timestamp = setupTableForDiffScans(client, table, numRows);
+
+    // Since the diff scan interval is [start, end), increment the start 
timestamp to exclude
+    // the last row inserted in the first group of ops, and increment the end 
timestamp to include
+    // the last row deleted in the second group of ops.
+    List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1)
+        .build();
+    assertEquals(1, tokens.size());
+
+    // Rename a column between when the token is created and when it is 
rehydrated into a scanner
+    client.alterTable(table.getName(),
+                      new AlterTableOptions().renameColumn("column1_i", 
"column1_i_new"));
+
+    KuduScanner scanner = tokens.get(0).intoScanner(client);
+
+    // TODO(wdberkeley): Handle renaming a column between when the token is 
rehydrated as a scanner
+    //  and when the scanner first hits a replica. Note that this is almost 
certainly a very
+    //  short period of vulnerability.
+
+    checkDiffScanResults(scanner, 3 * numRows / 4, numRows / 4);
+  }
 }

Reply via email to