This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e78e4a  [java] Add private diff scan support
5e78e4a is described below

commit 5e78e4a5c0a100a6f9e0c0d6cb84450a4abfada1
Author: Grant Henke <[email protected]>
AuthorDate: Fri Mar 1 08:46:55 2019 -0600

    [java] Add private diff scan support
    
    Adds a private diff scan API to the java client. This is
    achieved primarily by adding a
    `diffScan(long startTimestamp, long endTimestamp)`
    method to the AbstractKuduScannerBuilder builder,
    and then configuring the scan RPC to include the set fields.
    
    The column projection of the scan will include an extra
    `IS_DELETED` column when a diff scan is performed.
    The value of the `IS_DELETED` virtual column can be
    retrieved via `RowResult.isDeleted()`
    
    Change-Id: I51b01ae76cd22407df9097b56629ac7262ec2964
    Reviewed-on: http://gerrit.cloudera.org:8080/12689
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <[email protected]>
    Reviewed-by: Mike Percy <[email protected]>
---
 .../main/java/org/apache/kudu/ColumnSchema.java    |  58 ++++-
 .../src/main/java/org/apache/kudu/Schema.java      |  26 ++-
 .../src/main/java/org/apache/kudu/Type.java        |   3 +-
 .../kudu/client/AbstractKuduScannerBuilder.java    |  20 ++
 .../org/apache/kudu/client/AsyncKuduScanner.java   |  84 +++++--
 .../java/org/apache/kudu/client/KuduScanToken.java |  16 +-
 .../java/org/apache/kudu/client/KuduScanner.java   |   4 +-
 .../java/org/apache/kudu/client/Operation.java     |   4 +-
 .../java/org/apache/kudu/client/PartialRow.java    |   3 +-
 .../org/apache/kudu/client/ProtobufHelper.java     |   3 +-
 .../java/org/apache/kudu/client/RowResult.java     |  57 +++--
 .../java/org/apache/kudu/util/DataGenerator.java   |  12 +
 .../java/org/apache/kudu/util/TimestampUtil.java   |  11 +
 .../org/apache/kudu/client/TestKuduScanner.java    | 255 ++++++++++++++++++++-
 src/kudu/client/client.proto                       |   8 +-
 15 files changed, 509 insertions(+), 55 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java 
b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index 212ce78..1b6846e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -43,6 +43,7 @@ public class ColumnSchema {
   private final CompressionAlgorithm compressionAlgorithm;
   private final ColumnTypeAttributes typeAttributes;
   private final int typeSize;
+  private final Common.DataType wireType;
 
   /**
    * Specifies the encoding of data for a column on disk.
@@ -100,7 +101,8 @@ public class ColumnSchema {
 
   private ColumnSchema(String name, Type type, boolean key, boolean nullable,
                        Object defaultValue, int desiredBlockSize, Encoding 
encoding,
-                       CompressionAlgorithm compressionAlgorithm, 
ColumnTypeAttributes typeAttributes) {
+                       CompressionAlgorithm compressionAlgorithm,
+                       ColumnTypeAttributes typeAttributes, Common.DataType 
wireType) {
     this.name = name;
     this.type = type;
     this.key = key;
@@ -111,6 +113,7 @@ public class ColumnSchema {
     this.compressionAlgorithm = compressionAlgorithm;
     this.typeAttributes = typeAttributes;
     this.typeSize = type.getSize(typeAttributes);
+    this.wireType = wireType;
   }
 
   /**
@@ -186,6 +189,14 @@ public class ColumnSchema {
   }
 
   /**
+   * Get the column's underlying DataType.
+   */
+  @InterfaceAudience.Private
+  public Common.DataType getWireType() {
+    return wireType;
+  }
+
+  /**
    * The size of this type in bytes on the wire.
    * @return A size
    */
@@ -233,10 +244,11 @@ public class ColumnSchema {
     private boolean key = false;
     private boolean nullable = false;
     private Object defaultValue = null;
-    private int blockSize = 0;
+    private int desiredBlockSize = 0;
     private Encoding encoding = null;
     private CompressionAlgorithm compressionAlgorithm = null;
     private ColumnTypeAttributes typeAttributes = null;
+    private Common.DataType wireType = null;
 
     /**
      * Constructor for the required parameters.
@@ -249,6 +261,23 @@ public class ColumnSchema {
     }
 
     /**
+     * Constructor to copy an existing columnSchema
+     * @param that the columnSchema to copy
+     */
+    public ColumnSchemaBuilder(ColumnSchema that) {
+      this.name = that.name;
+      this.type = that.type;
+      this.key = that.key;
+      this.nullable = that.nullable;
+      this.defaultValue = that.defaultValue;
+      this.desiredBlockSize = that.desiredBlockSize;
+      this.encoding = that.encoding;
+      this.compressionAlgorithm = that.compressionAlgorithm;
+      this.typeAttributes = that.typeAttributes;
+      this.wireType = that.wireType;
+    }
+
+    /**
      * Sets if the column is part of the row key. False by default.
      * @param key a boolean that indicates if the column is part of the key
      * @return this instance
@@ -300,12 +329,12 @@ public class ColumnSchema {
      *
      * It's recommended that this not be set any lower than 4096 (4KB) or 
higher
      * than 1048576 (1MB).
-     * @param blockSize the desired block size, in bytes
+     * @param desiredBlockSize the desired block size, in bytes
      * @return this instance
      * <!-- TODO(KUDU-1107): move the above info to docs -->
      */
-    public ColumnSchemaBuilder desiredBlockSize(int blockSize) {
-      this.blockSize = blockSize;
+    public ColumnSchemaBuilder desiredBlockSize(int desiredBlockSize) {
+      this.desiredBlockSize = desiredBlockSize;
       return this;
     }
 
@@ -340,13 +369,30 @@ public class ColumnSchema {
     }
 
     /**
+     * Allows an alternate {@link Common.DataType} to override the {@link Type}
+     * when serializing the ColumnSchema on the wire.
+     * This is useful for virtual columns specified by their type such as
+     * {@link Common.DataType#IS_DELETED}.
+     */
+    @InterfaceAudience.Private
+    public ColumnSchemaBuilder wireType(Common.DataType wireType) {
+      this.wireType = wireType;
+      return this;
+    }
+
+    /**
      * Builds a {@link ColumnSchema} using the passed parameters.
      * @return a new {@link ColumnSchema}
      */
     public ColumnSchema build() {
+      // Set the wire type if it wasn't explicitly set.
+      if (wireType == null) {
+        this.wireType = type.getDataType(typeAttributes);
+      }
       return new ColumnSchema(name, type,
                               key, nullable, defaultValue,
-                              blockSize, encoding, compressionAlgorithm, 
typeAttributes);
+                              desiredBlockSize, encoding, compressionAlgorithm,
+                              typeAttributes, wireType);
     }
   }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java 
b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index 8e19f38..f5d746e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.kudu.Common.DataType;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -66,6 +68,9 @@ public class Schema {
   private final int rowSize;
   private final boolean hasNullableColumns;
 
+  private final int isDeletedIndex;
+  private static final int NO_IS_DELETED_INDEX = -1;
+
   /**
    * Constructs a schema using the specified columns and does some internal 
accounting
    *
@@ -102,6 +107,7 @@ public class Schema {
     this.columnsById = hasColumnIds ? new HashMap<Integer, 
Integer>(columnIds.size()) : null;
     int offset = 0;
     boolean hasNulls = false;
+    int isDeletedIndex = NO_IS_DELETED_INDEX;
     // pre-compute a few counts and offsets
     for (int index = 0; index < columns.size(); index++) {
       final ColumnSchema column = columns.get(index);
@@ -126,11 +132,17 @@ public class Schema {
               String.format("Column IDs must be unique: %s", columnIds));
         }
       }
+
+      // If this is the IS_DELETED virtual column, set `hasIsDeleted` and 
`isDeletedIndex`.
+      if (column.getWireType() == DataType.IS_DELETED) {
+        isDeletedIndex = index;
+      }
     }
 
-    this.hasNullableColumns = hasNulls;
     this.varLengthColumnCount = varLenCnt;
     this.rowSize = getRowSize(this.columnsByIndex);
+    this.hasNullableColumns = hasNulls;
+    this.isDeletedIndex = isDeletedIndex;
   }
 
   /**
@@ -293,4 +305,16 @@ public class Schema {
   public PartialRow newPartialRow() {
     return new PartialRow(this);
   }
+
+  /**
+   * @return the index of the IS_DELETED virtual column
+   * @throws IllegalStateException if no IS_DELETED virtual column exists
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public int getIsDeletedIndex() {
+    Preconditions.checkState(isDeletedIndex != NO_IS_DELETED_INDEX,
+        "Schema doesn't have an IS_DELETED columns");
+    return isDeletedIndex;
+  }
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java 
b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index c3b2a91..ec7d542 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -29,7 +29,6 @@ import com.google.common.primitives.Shorts;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
-import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.util.DecimalUtil;
 
 /**
@@ -147,6 +146,7 @@ public enum Type {
         return 8 + 8; // offset then string length
       case BOOL:
       case INT8:
+      case IS_DELETED:
         return 1;
       case INT16:
         return Shorts.BYTES;
@@ -183,6 +183,7 @@ public enum Type {
       case DECIMAL64:
       case DECIMAL128:
         return DECIMAL;
+      case IS_DELETED: return BOOL;
       default:
         throw new IllegalArgumentException("The provided data type doesn't 
map" +
             " to know any known one: " + 
type.getDescriptorForType().getFullName());
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 10fe8cb..5865876 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -47,6 +47,7 @@ public abstract class AbstractKuduScannerBuilder
   long limit = Long.MAX_VALUE;
   boolean prefetching = false;
   boolean cacheBlocks = true;
+  long startTimestamp = AsyncKuduClient.NO_TIMESTAMP;
   long htTimestamp = AsyncKuduClient.NO_TIMESTAMP;
   byte[] lowerBoundPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
   byte[] upperBoundPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
@@ -260,6 +261,25 @@ public abstract class AbstractKuduScannerBuilder
   }
 
   /**
+   * Sets the start timestamp and end timestamp for a diff scan.
+   * The timestamps should be encoded HT timestamps.
+   *
+   * Additionally sets any other scan properties required by diff scans.
+   *
+   * @param startTimestamp a long representing a HybridTime-encoded start 
timestamp
+   * @param endTimestamp a long representing a HybridTime-encoded end timestamp
+   * @return this instance
+   */
+  @InterfaceAudience.Private
+  public S diffScan(long startTimestamp, long endTimestamp) {
+    this.startTimestamp = startTimestamp;
+    this.htTimestamp = endTimestamp;
+    this.isFaultTolerant = true;
+    this.readMode = AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT;
+    return (S) this;
+  }
+
+  /**
    * Sets how long each scan request to a server can last.
    * Defaults to {@link KuduClient#getDefaultOperationTimeoutMs()}.
    * @param scanRequestTimeout a long representing time in milliseconds
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index aafe62b..7662bb5 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -44,6 +44,7 @@ import com.google.protobuf.UnsafeByteOperations;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
 import org.apache.kudu.security.Token;
+import org.apache.kudu.Type;
 import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB;
 import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -149,6 +150,11 @@ public final class AsyncKuduScanner {
     }
   }
 
+  // This is private because it is not safe to use this column name as it may 
be
+  // different in the case of collisions. Instead the `IS_DELETED` column 
should
+  // be looked up by type.
+  static final String DEFAULT_IS_DELETED_COL_NAME = "is_deleted";
+
   //////////////////////////
   // Initial configurations.
   //////////////////////////
@@ -199,6 +205,8 @@ public final class AsyncKuduScanner {
 
   private final boolean isFaultTolerant;
 
+  private final long startTimestamp;
+
   private long htTimestamp;
 
   private long lowerBoundPropagationTimestamp = AsyncKuduClient.NO_TIMESTAMP;
@@ -253,7 +261,8 @@ public final class AsyncKuduScanner {
                    Map<String, KuduPredicate> predicates, long limit,
                    boolean cacheBlocks, boolean prefetching,
                    byte[] startPrimaryKey, byte[] endPrimaryKey,
-                   long htTimestamp, int batchSizeBytes, PartitionPruner 
pruner,
+                   long startTimestamp, long htTimestamp,
+                   int batchSizeBytes, PartitionPruner pruner,
                    ReplicaSelection replicaSelection, long keepAlivePeriodMs) {
     checkArgument(batchSizeBytes >= 0, "Need non-negative number of bytes, " +
         "got %s", batchSizeBytes);
@@ -286,29 +295,32 @@ public final class AsyncKuduScanner {
     this.prefetching = prefetching;
     this.startPrimaryKey = startPrimaryKey;
     this.endPrimaryKey = endPrimaryKey;
+    this.startTimestamp = startTimestamp;
     this.htTimestamp = htTimestamp;
     this.batchSizeBytes = batchSizeBytes;
     this.lastPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
 
     // Map the column names to actual columns in the table schema.
     // If the user set this to 'null', we scan all columns.
+    List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
     if (projectedNames != null) {
-      List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
       for (String columnName : projectedNames) {
         ColumnSchema originalColumn = table.getSchema().getColumn(columnName);
         columns.add(getStrippedColumnSchema(originalColumn));
       }
-      this.schema = new Schema(columns);
     } else if (projectedIndexes != null) {
-      List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
       for (Integer columnIndex : projectedIndexes) {
         ColumnSchema originalColumn = 
table.getSchema().getColumnByIndex(columnIndex);
         columns.add(getStrippedColumnSchema(originalColumn));
       }
-      this.schema = new Schema(columns);
     } else {
-      this.schema = table.getSchema();
+      columns.addAll(table.getSchema().getColumns());
     }
+    // This is a diff scan so add the IS_DELETED column.
+    if (startTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+      columns.add(generateIsDeletedColumn(table.getSchema()));
+    }
+    this.schema = new Schema(columns);
 
     // If the partition pruner has pruned all partitions, then the scan can be
     // short circuited without contacting any tablet servers.
@@ -330,14 +342,44 @@ public final class AsyncKuduScanner {
   }
 
   /**
-   * Clone the given column schema instance. The new instance will include 
only the name, type, and
-   * nullability of the passed one.
+   * Generates and returns a ColumnSchema for the virtual IS_DELETED column.
+   * The column name is generated to ensure there is never a collision.
+   *
+   * @param schema the table schema
+   * @return a ColumnSchema for the virtual IS_DELETED column
+   */
+  private static ColumnSchema generateIsDeletedColumn(Schema schema) {
+    String columnName = DEFAULT_IS_DELETED_COL_NAME;
+    boolean collision = true;
+    while (collision) {
+      try {
+        // If getColumnIndex doesn't throw an IllegalArgumentException then
+        // the column already exists and we need to pick an alternate 
IS_DELETED column name.
+        schema.getColumnIndex(columnName);
+        columnName += "_";
+      } catch (IllegalArgumentException ex) {
+        collision = false;
+      }
+    }
+    return new ColumnSchema.ColumnSchemaBuilder(columnName, Type.BOOL)
+            .wireType(Common.DataType.IS_DELETED)
+            .defaultValue(false)
+            .nullable(false)
+            .key(false)
+            .build();
+  }
+
+  /**
+   * Sets isKey to false on the passed ColumnSchema.
+   * This allows out of order key columns in projections.
+   *
+   * TODO: Remove the need for this by handling server side.
+   *
    * @return a new column schema
    */
   private static ColumnSchema getStrippedColumnSchema(ColumnSchema 
columnToClone) {
-    return new ColumnSchema.ColumnSchemaBuilder(columnToClone.getName(), 
columnToClone.getType())
-        .nullable(columnToClone.isNullable())
-        .typeAttributes(columnToClone.getTypeAttributes())
+    return new ColumnSchema.ColumnSchemaBuilder(columnToClone)
+        .key(false)
         .build();
   }
 
@@ -407,6 +449,10 @@ public final class AsyncKuduScanner {
     return keepAlivePeriodMs;
   }
 
+  long getStartSnaphshotTimestamp() {
+    return this.startTimestamp;
+  }
+
   long getSnapshotTimestamp() {
     return this.htTimestamp;
   }
@@ -983,10 +1029,14 @@ public final class AsyncKuduScanner {
           }
           
newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
 
-          // if the mode is set to read on snapshot sent the snapshot timestamp
-          if (AsyncKuduScanner.this.getReadMode() == ReadMode.READ_AT_SNAPSHOT 
&&
-              AsyncKuduScanner.this.getSnapshotTimestamp() != 
AsyncKuduClient.NO_TIMESTAMP) {
-            
newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
+          // if the mode is set to read on snapshot set the snapshot 
timestamps.
+          if (AsyncKuduScanner.this.getReadMode() == 
ReadMode.READ_AT_SNAPSHOT) {
+            if (AsyncKuduScanner.this.getSnapshotTimestamp() != 
AsyncKuduClient.NO_TIMESTAMP) {
+              
newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
+            }
+            if (AsyncKuduScanner.this.getStartSnaphshotTimestamp() != 
AsyncKuduClient.NO_TIMESTAMP) {
+              
newBuilder.setSnapStartTimestamp(AsyncKuduScanner.this.getStartSnaphshotTimestamp());
+            }
           }
 
           if (isFaultTolerant) {
@@ -1120,8 +1170,8 @@ public final class AsyncKuduScanner {
       return new AsyncKuduScanner(
           client, table, projectedColumnNames, projectedColumnIndexes, 
readMode, isFaultTolerant,
           scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, 
lowerBoundPrimaryKey,
-          upperBoundPrimaryKey, htTimestamp, batchSizeBytes, 
PartitionPruner.create(this),
-          replicaSelection, keepAlivePeriodMs);
+          upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
+          PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs);
     }
   }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 32618d3..db79e14 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -212,6 +212,10 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
           if (message.hasSnapTimestamp()) {
             builder.snapshotTimestampRaw(message.getSnapTimestamp());
           }
+          // Set the diff scan timestamps if they are set.
+          if (message.hasSnapStartTimestamp()) {
+            builder.diffScan(message.getSnapStartTimestamp(), 
message.getSnapTimestamp());
+          }
           break;
         }
         case READ_LATEST: {
@@ -366,10 +370,14 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
         proto.setPropagatedTimestamp(client.getLastPropagatedTimestamp());
       }
 
-      // If the mode is set to read on snapshot set the snapshot timestamp.
-      if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT &&
-          htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
-        proto.setSnapTimestamp(htTimestamp);
+      // If the mode is set to read on snapshot set the snapshot timestamps.
+      if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) {
+        if (htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+          proto.setSnapTimestamp(htTimestamp);
+        }
+        if (startTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+          proto.setSnapStartTimestamp(startTimestamp);
+        }
       }
 
       proto.setCacheBlocks(cacheBlocks);
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 3caeeb1..4e8daa7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -206,8 +206,8 @@ public class KuduScanner implements Iterable<RowResult> {
       return new KuduScanner(new AsyncKuduScanner(
           client, table, projectedColumnNames, projectedColumnIndexes, 
readMode, isFaultTolerant,
           scanRequestTimeout, predicates, limit, cacheBlocks, prefetching, 
lowerBoundPrimaryKey,
-          upperBoundPrimaryKey, htTimestamp, batchSizeBytes, 
PartitionPruner.create(this),
-          replicaSelection, keepAlivePeriodMs));
+          upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
+          PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs));
     }
   }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 726a2dc..200e8e2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -355,7 +355,9 @@ public abstract class Operation extends 
KuduRpc<OperationResponse> {
         columnCount = row.getSchema().getPrimaryKeyColumnCount();
         // Clear the bits indicating any non-key fields are set.
         columnsBitSet.clear(schema.getPrimaryKeyColumnCount(), 
columnsBitSet.size() - 1);
-        nullsBitSet.clear(schema.getPrimaryKeyColumnCount(), 
nullsBitSet.size() - 1);
+        if (schema.hasNullableColumns()) {
+          nullsBitSet.clear(schema.getPrimaryKeyColumnCount(), 
nullsBitSet.size() - 1);
+        }
       }
 
       rows.put(type.toEncodedByte());
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index c7c64bd..261d42a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -1515,7 +1515,8 @@ public class PartialRow {
     int offset = lower.getSchema().getColumnOffset(index);
 
     switch (type) {
-      case BOOL: return lower.rowAlloc[offset] + 1 == upper.rowAlloc[offset];
+      case BOOL:
+        return lower.rowAlloc[offset] + 1 == upper.rowAlloc[offset];
       case INT8: {
         byte val = lower.rowAlloc[offset];
         return val != Byte.MAX_VALUE && val + 1 == upper.rowAlloc[offset];
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 432ad28..bbb58ce 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -71,10 +71,11 @@ public class ProtobufHelper {
                                                  ColumnSchema column) {
     schemaBuilder
         .setName(column.getName())
-        .setType(column.getType().getDataType(column.getTypeAttributes()))
+        .setType(column.getWireType())
         .setIsKey(column.isKey())
         .setIsNullable(column.isNullable())
         .setCfileBlockSize(column.getDesiredBlockSize());
+
     if (column.getEncoding() != null) {
       schemaBuilder.setEncoding(column.getEncoding().getInternalPbType());
     }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index dd726aa..7f80670 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.BitSet;
 
 import org.apache.kudu.util.TimestampUtil;
@@ -248,10 +249,8 @@ public class RowResult {
   public long getLong(int columnIndex) {
     checkValidColumn(columnIndex);
     checkNull(columnIndex);
-    // Can't check type because this could be a long, string, or Timestamp.
-    return Bytes.getLong(this.rowData.getRawArray(),
-                         this.rowData.getRawOffset() +
-                             getCurrentRowDataOffsetForColumn(columnIndex));
+    checkType(columnIndex, Type.INT64, Type.UNIXTIME_MICROS);
+    return getLongOrOffset(columnIndex);
   }
 
   /**
@@ -366,10 +365,7 @@ public class RowResult {
     checkValidColumn(columnIndex);
     checkNull(columnIndex);
     checkType(columnIndex, Type.UNIXTIME_MICROS);
-    ColumnSchema column = schema.getColumnByIndex(columnIndex);
-    long micros = Bytes.getLong(this.rowData.getRawArray(),
-        this.rowData.getRawOffset() +
-            getCurrentRowDataOffsetForColumn(columnIndex));
+    long micros = getLongOrOffset(columnIndex);
     return TimestampUtil.microsToTimestamp(micros);
   }
 
@@ -405,8 +401,8 @@ public class RowResult {
     checkValidColumn(columnIndex);
     checkNull(columnIndex);
     checkType(columnIndex, Type.STRING);
-    // C++ puts a Slice in rowData which is 16 bytes long for simplity, but we 
only support ints.
-    long offset = getLong(columnIndex);
+    // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but 
we only support ints.
+    long offset = getLongOrOffset(columnIndex);
     long length = 
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
     assert offset < Integer.MAX_VALUE;
     assert length < Integer.MAX_VALUE;
@@ -441,7 +437,7 @@ public class RowResult {
     checkNull(columnIndex);
     // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
     // but we only support ints.
-    long offset = getLong(columnIndex);
+    long offset = getLongOrOffset(columnIndex);
     long length = 
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
     assert offset < Integer.MAX_VALUE;
     assert length < Integer.MAX_VALUE;
@@ -483,7 +479,7 @@ public class RowResult {
     checkType(columnIndex, Type.BINARY);
     // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
     // but we only support ints.
-    long offset = getLong(columnIndex);
+    long offset = getLongOrOffset(columnIndex);
     long length = 
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
     assert offset < Integer.MAX_VALUE;
     assert length < Integer.MAX_VALUE;
@@ -492,6 +488,18 @@ public class RowResult {
   }
 
   /**
+   * Returns the long column value if the column type is INT64 or 
UNIXTIME_MICROS.
+   * Returns the column's offset into the indirectData if the column type is 
BINARY or STRING.
+   * @param columnIndex Column index in the schema
+   * @return a long value for the column
+   */
+  long getLongOrOffset(int columnIndex) {
+    return Bytes.getLong(this.rowData.getRawArray(),
+        this.rowData.getRawOffset() +
+            getCurrentRowDataOffsetForColumn(columnIndex));
+  }
+
+  /**
    * Get if the specified column is NULL
    * @param columnName name of the column in the schema
    * @return true if the column cell is null and the column is nullable,
@@ -589,6 +597,16 @@ public class RowResult {
   }
 
   /**
+   * @return the value of the IS_DELETED virtual column
+   * @throws IllegalStateException if no IS_DELETED virtual column exists
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public boolean isDeleted() {
+    return getBoolean(schema.getIsDeletedIndex());
+  }
+
+  /**
    * Get the type of a column in this result.
    * @param columnName name of the column
    * @return a type
@@ -639,14 +657,17 @@ public class RowResult {
     }
   }
 
-  private void checkType(int columnIndex, Type expectedType) {
+  private void checkType(int columnIndex, Type... types) {
     ColumnSchema columnSchema = schema.getColumnByIndex(columnIndex);
     Type columnType = columnSchema.getType();
-    if (!columnType.equals(expectedType)) {
-      throw new IllegalArgumentException("Column (name: " + 
columnSchema.getName() +
-          ", index: " + columnIndex + ") is of type " +
-          columnType.getName() + " but was requested as a type " + 
expectedType.getName());
+    for (Type type : types) {
+      if (columnType.equals(type)) {
+        return;
+      }
     }
+    throw new IllegalArgumentException("Column (name: " + 
columnSchema.getName() +
+        ", index: " + columnIndex + ") is of type " +
+        columnType.getName() + " but was requested as a type " + 
Arrays.toString(types));
   }
 
   @Override
@@ -689,7 +710,7 @@ public class RowResult {
             buf.append(getLong(i));
             break;
           case UNIXTIME_MICROS: {
-            buf.append(TimestampUtil.timestampToString(getLong(i)));
+            buf.append(TimestampUtil.timestampToString(getTimestamp(i)));
           } break;
           case STRING:
             buf.append(getString(i));
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java 
b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
index 63cff02..045f8fb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
@@ -62,10 +62,22 @@ public class DataGenerator {
    * @param row the PartialRow to randomize.
    */
   public void randomizeRow(PartialRow row) {
+    this.randomizeRow(row, true);
+  }
+
+  /**
+   * Randomizes the fields in a given PartialRow.
+   * @param row the PartialRow to randomize.
+   * @param randomizeKeys true if the key columns should be randomized.
+   */
+  public void randomizeRow(PartialRow row, boolean randomizeKeys) {
     Schema schema = row.getSchema();
     List<ColumnSchema> columns = schema.getColumns();
     for (int i = 0; i < columns.size(); i++) {
       ColumnSchema col = columns.get(i);
+      if (col.isKey() && !randomizeKeys) {
+        continue;
+      }
       Type type = col.getType();
       if (col.isNullable() && random.nextFloat() <= nullRate) {
         // Sometimes set nullable columns to null.
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java 
b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
index f6ebd65..959b8a5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
@@ -80,6 +80,17 @@ public class TimestampUtil {
   /**
    * Transforms a timestamp into a string, whose formatting and timezone is 
consistent
    * across Kudu.
+   * @param timestamp the timestamp
+   * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
+   */
+  public static String timestampToString(Timestamp timestamp) {
+    long micros = timestampToMicros(timestamp);
+    return timestampToString(micros);
+  }
+
+  /**
+   * Transforms a timestamp into a string, whose formatting and timezone is 
consistent
+   * across Kudu.
    * @param micros the timestamp, in microseconds
    * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
    */
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
index 57a44be..7ed7b55 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
@@ -17,34 +17,52 @@
 package org.apache.kudu.client;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.Common.DataType;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.client.Operation.ChangeType;
 import org.apache.kudu.test.ClientTestUtil;
 import org.apache.kudu.test.KuduTestHarness;
 import org.apache.kudu.test.RandomUtils;
 import org.apache.kudu.util.DataGenerator;
+import org.apache.kudu.util.Pair;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
+import static 
org.apache.kudu.client.AsyncKuduScanner.DEFAULT_IS_DELETED_COL_NAME;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 
 public class TestKuduScanner {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestScannerMultiTablet.class);
+
   private static final String tableName = "TestKuduScanner";
 
-  private static final Schema basicSchema = ClientTestUtil.getBasicSchema();
+  private static final int DIFF_FLUSH_SEC = 1;
 
   private KuduClient client;
+  private Random random;
+  private DataGenerator generator;
 
   @Rule
   public KuduTestHarness harness = new KuduTestHarness();
@@ -52,11 +70,15 @@ public class TestKuduScanner {
   @Before
   public void setUp() {
     client = harness.getClient();
+    random = RandomUtils.getRandom();
+    generator = new DataGenerator.DataGeneratorBuilder()
+        .random(random)
+        .build();
   }
 
   @Test(timeout = 100000)
   public void testIterable() throws Exception {
-    KuduTable table = client.createTable(tableName, basicSchema, 
getBasicCreateTableOptions());
+    KuduTable table = client.createTable(tableName, getBasicSchema(), 
getBasicCreateTableOptions());
     DataGenerator generator = new DataGenerator.DataGeneratorBuilder()
         .random(RandomUtils.getRandom())
         .build();
@@ -153,4 +175,233 @@ public class TestKuduScanner {
       }
     }
   }
+
+  @Test(timeout = 100000)
+  @KuduTestHarness.TabletServerConfig(flags = { "--flush_threshold_secs=" + 
DIFF_FLUSH_SEC })
+  public void testDiffScan() throws Exception {
+    Schema schema = new Schema(Arrays.asList(
+        new ColumnSchema.ColumnSchemaBuilder("key", 
Type.INT32).key(true).build(),
+        // Include a column with the default IS_DELETED column name to test 
collision handling.
+        new ColumnSchema.ColumnSchemaBuilder(DEFAULT_IS_DELETED_COL_NAME, 
Type.INT32).build()
+    ));
+
+    KuduTable table = client.createTable(tableName, schema, 
getBasicCreateTableOptions());
+
+    // Generate some rows before the start time.
+    int beforeBounds = 5;
+    List<Operation> beforeOps = generateMutationOperations(table, 
random.nextInt(beforeBounds),
+        random.nextInt(beforeBounds), random.nextInt(beforeBounds));
+    Map<Integer, ChangeType> before = applyOperations(beforeOps);
+    LOG.info("Before: {}", before);
+
+    // Set the start timestamp after the initial mutations by getting the 
propagated timestamp,
+    // and incrementing by 1.
+    long startHT = client.getLastPropagatedTimestamp() + 1;
+    LOG.info("startHT: {}", startHT);
+
+    // Generate row mutations.
+    int mutationBounds = 10;
+    int numInserts = random.nextInt(mutationBounds);
+    int numUpdates = random.nextInt(mutationBounds);
+    int numDeletes = random.nextInt(mutationBounds);
+    List<Operation> operations =
+        generateMutationOperations(table, numInserts, numUpdates, numDeletes);
+    Map<Integer, ChangeType> mutations = applyOperations(operations);
+    LOG.info("Mutations: {}", mutations);
+
+    // Set the end timestamp after the test mutations by getting the 
propagated timestamp,
+    // and incrementing by 1.
+    long endHT = client.getLastPropagatedTimestamp() + 1;
+    LOG.info("endHT: {}", endHT);
+
+    // Generate Some Rows after the end time.
+    int afterBounds = 5;
+    List<Operation> afterOps = generateMutationOperations(table, 
random.nextInt(afterBounds),
+        random.nextInt(afterBounds), random.nextInt(afterBounds));
+    Map<Integer, ChangeType> after = applyOperations(afterOps);
+    LOG.info("After: {}", after);
+
+    // Diff scan the time range.
+    // Pass through the scan token API to ensure serialization of tokens works 
too.
+    List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        .diffScan(startHT, endHT)
+        .build();
+    List<RowResult> results = new ArrayList<>();
+    for (KuduScanToken token : tokens) {
+      KuduScanner scanner = 
KuduScanToken.deserializeIntoScanner(token.serialize(), client);
+
+      // Verify the IS_DELETED column is appended at the end of the projection.
+      Schema projection = scanner.getProjectionSchema();
+      int isDeletedIndex = projection.getIsDeletedIndex();
+      assertEquals(projection.getColumnCount() - 1, isDeletedIndex);
+      // Verify the IS_DELETED column has the correct types.
+      ColumnSchema isDeletedCol = projection.getColumnByIndex(isDeletedIndex);
+      assertEquals(Type.BOOL, isDeletedCol.getType());
+      assertEquals(DataType.IS_DELETED, isDeletedCol.getWireType());
+      // Verify the IS_DELETED column is named to avoid collision.
+      assertEquals(projection.getColumnByIndex(isDeletedIndex),
+          projection.getColumn(DEFAULT_IS_DELETED_COL_NAME + "_"));
+
+      for (RowResult row : scanner) {
+        results.add(row);
+      }
+    }
+    assertEquals(mutations.size(), results.size());
+
+    // Count the results and verify their change type.
+    int resultNumInserts = 0;
+    int resultNumUpdates = 0;
+    int resultNumDeletes = 0;
+    int resultExtra = 0;
+    for (RowResult result : results) {
+      Integer key = result.getInt(0);
+      LOG.info("Processing key {}", key);
+      ChangeType type = mutations.get(key);
+      if (type == ChangeType.INSERT) {
+        assertFalse(result.isDeleted());
+        resultNumInserts++;
+      } else if (type == ChangeType.UPDATE) {
+        assertFalse(result.isDeleted());
+        resultNumUpdates++;
+      } else if (type == ChangeType.DELETE) {
+        assertTrue(result.isDeleted());
+        resultNumDeletes++;
+      } else {
+        // The key was not found in the mutations map. This means that we 
somehow managed to scan
+        // a row that was never mutated. It's an error and will trigger an 
assert below.
+        assertNull(type);
+        resultExtra++;
+      }
+    }
+    assertEquals(numInserts, resultNumInserts);
+    assertEquals(numUpdates, resultNumUpdates);
+    assertEquals(numDeletes, resultNumDeletes);
+    assertEquals(0, resultExtra);
+  }
+
+  /**
+   * Applies a list of Operations and returns the final ChangeType for each 
key.
+   * @param operations the operations to apply.
+   * @return a map of each key and its final ChangeType.
+   */
+  private Map<Integer, ChangeType> applyOperations(List<Operation> operations) 
throws Exception {
+    Map<Integer, ChangeType> results = new HashMap<>();
+    KuduSession session = client.newSession();
+    // On some runs, wait long enough to flush at the start.
+    if (random.nextBoolean()) {
+      LOG.info("Waiting for a flush at the start of applyOperations");
+      Thread.sleep(DIFF_FLUSH_SEC + 1);
+    }
+
+    // Pick an int as a flush indicator so we flush once on average while 
applying operations.
+    int flushInt = random.nextInt(operations.size());
+    for (Operation op : operations) {
+      // On some runs, wait long enough to flush while applying operations.
+      if (random.nextInt(operations.size()) == flushInt) {
+        LOG.info("Waiting for a flush in the middle of applyOperations");
+        Thread.sleep(DIFF_FLUSH_SEC + 1);
+      }
+      OperationResponse resp = session.apply(op);
+      if (resp.hasRowError()) {
+        LOG.error("Could not mutate row: " + 
resp.getRowError().getErrorStatus());
+      }
+      assertFalse(resp.hasRowError());
+      results.put(op.getRow().getInt(0), op.getChangeType());
+    }
+    return results;
+  }
+
+  /**
+   * Generates a list of random mutation operations. Any unique row, 
identified by
+   * it's key, could have a random number of operations/mutations. However, the
+   * target count of numInserts, numUpdates and numDeletes will always be 
achieved
+   * if the entire list of operations is processed.
+   *
+   * @param table the table to generate operations for
+   * @param numInserts The number of row mutations to end with an insert
+   * @param numUpdates The number of row mutations to end with an update
+   * @param numDeletes The number of row mutations to end with an delete
+   * @return a list of random mutation operations
+   */
+  private List<Operation> generateMutationOperations(
+      KuduTable table, int numInserts, int numUpdates, int numDeletes) throws 
Exception {
+
+    List<Operation> results = new ArrayList<>();
+    List<MutationState> unfinished = new ArrayList<>();
+    int minMutationsBound = 5;
+
+    // Generate Operations to initialize all of the row with inserts.
+    List<Pair<ChangeType, Integer>> changeCounts = Arrays.asList(
+        new Pair<>(ChangeType.INSERT, numInserts),
+        new Pair<>(ChangeType.UPDATE, numUpdates),
+        new Pair<>(ChangeType.DELETE, numDeletes));
+    Set<Integer> keys = new HashSet<>();
+    for (Pair<ChangeType, Integer> changeCount : changeCounts) {
+      ChangeType type = changeCount.getFirst();
+      int count = changeCount.getSecond();
+      for (int i = 0; i < count; i++) {
+        // Generate a random insert.
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        generator.randomizeRow(row);
+        int key = row.getInt(0);
+        // Add the insert to the results.
+        results.add(insert);
+        // Initialize the unfinished MutationState.
+        unfinished.add(new MutationState(key, type, 
random.nextInt(minMutationsBound)));
+      }
+    }
+
+    // Randomly pull from the unfinished list, mutate it and add that 
operation to the results.
+    // If it has been mutated at least the minimum number of times, remove it 
from the unfinished
+    // list.
+    while (!unfinished.isEmpty()) {
+      // Get a random row to mutate.
+      int index = random.nextInt(unfinished.size());
+      MutationState state = unfinished.get(index);
+
+      // If the row is done, remove it from unfinished and continue.
+      if (state.numMutations >= state.minMutations && state.currentType == 
state.endType) {
+        unfinished.remove(index);
+        continue;
+      }
+
+      // Otherwise, generate an operation to mutate the row based on it's 
current ChangeType.
+      //    insert -> update|delete
+      //    update -> update|delete
+      //    delete -> insert
+      Operation op;
+      if (state.currentType == ChangeType.INSERT || state.currentType == 
ChangeType.UPDATE) {
+        op = (random.nextBoolean()) ? table.newUpdate() : table.newDelete();
+      } else {
+        // Must be a delete, so we need an insert next.
+        op = table.newInsert();
+      }
+      PartialRow row = table.getSchema().newPartialRow();
+      row.addInt(0, state.key);
+      generator.randomizeRow(row, /* randomizeKeys */ false);
+      op.setRow(row);
+      results.add(op);
+
+      state.currentType = op.getChangeType();
+      state.numMutations++;
+    }
+
+    return results;
+  }
+
+  private static class MutationState {
+    final int key;
+    final ChangeType endType;
+    final int minMutations;
+
+    ChangeType currentType = ChangeType.INSERT;
+    int numMutations = 0;
+
+    MutationState(int key, ChangeType endType, int minMutations) {
+      this.key = key;
+      this.endType = endType;
+      this.minMutations = minMutations;
+    }
+  }
 }
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index edb3f00..988a471 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -77,10 +77,16 @@ message ScanTokenPB {
   // See common.proto for further information about read modes.
   optional ReadMode read_mode = 10 [default = READ_LATEST];
 
-  // The requested snapshot timestamp. This is only used O
+  // The requested snapshot timestamp. This is only used
   // when the read mode is set to READ_AT_SNAPSHOT.
   optional fixed64 snap_timestamp = 11;
 
+  // The requested snapshot start timestamp. This is only used
+  // when the read mode is set to READ_AT_SNAPSHOT.
+  // Setting this indicates the scan should be a diff scan, the
+  // snap_timestamp will be used as the snapshot end timestamp.
+  optional fixed64 snap_start_timestamp = 19;
+
   // Sent by clients which previously executed CLIENT_PROPAGATED writes.
   // This updates the server's time so that no transaction will be assigned
   // a timestamp lower than or equal to 'previous_known_timestamp'

Reply via email to