This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d23ee5d  KUDU-1802: Avoid calls to master when using scan tokens
d23ee5d is described below

commit d23ee5d38ddc4317f431dd65df0c825c00cc968a
Author: Grant Henke <[email protected]>
AuthorDate: Wed Jun 3 20:56:19 2020 -0500

    KUDU-1802: Avoid calls to master when using scan tokens
    
    This patch adds new metadata to the scan token to allow it
    to contain all of the metadata required to construct a KuduTable
    and open a scanner in the clients. This means the GetTableSchema
    and GetTableLocations RPC calls to the master are no longer required
    when using the scan token.
    
    New TableMetadataPB, TabletMetadataPB, and authorization token
    fields were added as optional fields on the token. Additionally a
    `projected_column_idx` field was added that can be used in place
    of the `projected_columns`. This significantly reduces the size of
    the scan token by not duplicating the ColumnSchemaPB that is
    already in the TableMetadataPB.
    
    Adding the table metadata to the scan token is enabled by
    default given it’s more scalable and performant. However,
    it can be disabled in rare cases where more resiliency to
    column renaming is desired. One example where disabling the
    table metadata is used is the backup job. Future work, tracked
    by KUDU-3146, should allow for table metadata to be leveraged in
    those cases as well.
    
    This doesn’t avoid the need for a call to the master to get the
    schema in the case of writing data to Kudu, that work is tracked
    by KUDU-3135. I expect the TableMetadataPB message would
    be used there as well.
    
    I included the ability to disable this functionality in the
    kudu-spark integration via `kudu.useDriverMetadata` just
    in case there are any unforeseen issues or regressions with
    this feature.
    
    I added a test to compare the serialized size of the scan token with
    and without the table and tablet metadata. The size results for a
    100 column table are:
       no metadata: 2697 Bytes
       tablet metadata: 2805
       tablet, table, and authz metadata: 3258
    
    Change-Id: I88c1b8392de37dd5e8b7bd8b78a21603ff8b1d1b
    Reviewed-on: http://gerrit.cloudera.org:8080/16031
    Reviewed-by: Grant Henke <[email protected]>
    Tested-by: Grant Henke <[email protected]>
---
 .../org/apache/kudu/backup/KuduBackupRDD.scala     |   5 +-
 .../org/apache/kudu/client/AsyncKuduClient.java    |  29 ++-
 .../java/org/apache/kudu/client/KuduScanToken.java | 266 ++++++++++++++++++---
 .../org/apache/kudu/client/ProtobufHelper.java     |  50 +++-
 .../java/org/apache/kudu/client/RemoteTablet.java  |   8 +
 .../apache/kudu/client/TableLocationsCache.java    |   2 +-
 .../java/org/apache/kudu/client/TestScanToken.java | 169 +++++++++++--
 .../org/apache/kudu/spark/kudu/DefaultSource.scala |   7 +-
 .../scala/org/apache/kudu/spark/kudu/KuduRDD.scala |   3 +
 .../apache/kudu/spark/kudu/KuduReadOptions.scala   |   7 +-
 src/kudu/client/CMakeLists.txt                     |   1 +
 src/kudu/client/client.cc                          |  10 +
 src/kudu/client/client.h                           |  19 ++
 src/kudu/client/client.proto                       |  74 ++++++
 src/kudu/client/meta_cache.cc                      |  51 ++--
 src/kudu/client/meta_cache.h                       |  24 +-
 src/kudu/client/scan_token-internal.cc             | 224 +++++++++++++++--
 src/kudu/client/scan_token-internal.h              |  12 +-
 src/kudu/client/scan_token-test.cc                 | 197 +++++++++++++--
 19 files changed, 1023 insertions(+), 135 deletions(-)

diff --git 
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala 
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index 459faed..6e87e05 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -45,7 +45,6 @@ class KuduBackupRDD private[kudu] (
     @transient val sc: SparkContext)
     extends RDD[Row](sc, Nil) {
 
-  // TODO (KUDU-2785): Split large tablets into smaller scan tokens?
   override protected def getPartitions: Array[Partition] = {
     val client = kuduContext.syncClient
 
@@ -58,6 +57,10 @@ class KuduBackupRDD private[kudu] (
       .scanRequestTimeout(options.scanRequestTimeoutMs)
       .prefetching(options.scanPrefetching)
       .keepAlivePeriodMs(options.keepAlivePeriodMs)
+      // TODO(KUDU-3135): Make backup scans a bit more resilient to column 
renames given these
+      //  jobs are often critical, longer running, and scheduled in bulk. Once 
scans with
+      //  provided table metadata better handle column renames this can be 
removed.
+      .includeTableMetadata(false)
 
     options.splitSizeBytes.foreach { size =>
       builder.setSplitSizeBytes(size)
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index e808209..112c9c2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -2288,18 +2288,7 @@ public class AsyncKuduClient implements AutoCloseable {
     String tableId = table.getTableId();
     String tableName = table.getName();
 
-    // Doing a get first instead of putIfAbsent to avoid creating unnecessary
-    // table locations caches because in the most common case the table should
-    // already be present.
-    TableLocationsCache locationsCache = tableLocations.get(tableId);
-    if (locationsCache == null) {
-      locationsCache = new TableLocationsCache();
-      TableLocationsCache existingLocationsCache =
-          tableLocations.putIfAbsent(tableId, locationsCache);
-      if (existingLocationsCache != null) {
-        locationsCache = existingLocationsCache;
-      }
-    }
+    TableLocationsCache locationsCache = 
getOrCreateTableLocationsCache(tableId);
 
     // Build the list of discovered remote tablet instances. If we have
     // already discovered the tablet, its locations are refreshed.
@@ -2385,6 +2374,22 @@ public class AsyncKuduClient implements AutoCloseable {
     }
   }
 
+  TableLocationsCache getOrCreateTableLocationsCache(String tableId) {
+    // Doing a get first instead of putIfAbsent to avoid creating unnecessary
+    // table locations caches because in the most common case the table should
+    // already be present.
+    TableLocationsCache locationsCache = tableLocations.get(tableId);
+    if (locationsCache == null) {
+      locationsCache = new TableLocationsCache();
+      TableLocationsCache existingLocationsCache =
+          tableLocations.putIfAbsent(tableId, locationsCache);
+      if (existingLocationsCache != null) {
+        locationsCache = existingLocationsCache;
+      }
+    }
+    return locationsCache;
+  }
+
   /**
    * Gets the tablet location cache entry for the tablet in the table covering 
a partition key.
    * @param tableId the table
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 567e99a..62e7bfe 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -18,12 +18,19 @@
 package org.apache.kudu.client;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.UnsafeByteOperations;
@@ -34,6 +41,8 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.Client.ScanTokenPB;
+import org.apache.kudu.security.Token;
+import org.apache.kudu.util.NetUtil;
 import org.apache.kudu.util.Pair;
 
 /**
@@ -126,15 +135,11 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
    */
   public static String stringifySerializedToken(byte[] buf, KuduClient client) 
throws IOException {
     ScanTokenPB token = 
ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf));
-    KuduTable table = token.hasTableId() ? 
client.openTableById(token.getTableId()) :
-                                           
client.openTable(token.getTableName());
+    KuduTable table = getKuduTable(token, client);
 
     MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper("ScanToken")
-                                                   .add("table-name", 
token.getTableName());
-
-    if (token.hasTableId()) {
-      helper.add("table-id", token.getTableId());
-    }
+                                                   .add("table-name", 
table.getName());
+    helper.add("table-id", table.getTableId());
 
     if (token.hasLowerBoundPrimaryKey() && 
!token.getLowerBoundPrimaryKey().isEmpty()) {
       helper.add("lower-bound-primary-key",
@@ -162,6 +167,10 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
 
   private static List<Integer> 
computeProjectedColumnIndexesForScanner(ScanTokenPB message,
                                                                        Schema 
schema) {
+    if (message.getProjectedColumnIdxCount() != 0) {
+      return message.getProjectedColumnIdxList();
+    }
+
     List<Integer> columns = new 
ArrayList<>(message.getProjectedColumnsCount());
     for (Common.ColumnSchemaPB colSchemaFromPb : 
message.getProjectedColumnsList()) {
       int colIdx = colSchemaFromPb.hasId() && schema.hasColumnIds() ?
@@ -192,10 +201,55 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
         !message.getFeatureFlagsList().contains(ScanTokenPB.Feature.Unknown),
         "Scan token requires an unsupported feature. This Kudu client must be 
updated.");
 
-    KuduTable table = message.hasTableId() ? 
client.openTableById(message.getTableId()) :
-                                             
client.openTable(message.getTableName());
-    KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
+    // Use the table metadata from the scan token if it exists,
+    // otherwise call OpenTable to get the metadata from the master.
+    KuduTable table = getKuduTable(message, client);
+
+    // Prime the client tablet location cache if no entry is already present.
+    if (message.hasTabletMetadata()) {
+      Client.TabletMetadataPB tabletMetadata = message.getTabletMetadata();
+      Partition partition =
+          ProtobufHelper.pbToPartition(tabletMetadata.getPartition());
+      if (client.asyncClient.getTableLocationEntry(table.getTableId(),
+          partition.partitionKeyStart) == null) {
+        TableLocationsCache tableLocationsCache =
+            
client.asyncClient.getOrCreateTableLocationsCache(table.getTableId());
+
+        List<LocatedTablet.Replica> replicas = new ArrayList<>();
+        for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
+            tabletMetadata.getReplicasList()) {
+          Client.ServerMetadataPB server =
+              tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
+          LocatedTablet.Replica replica = new LocatedTablet.Replica(
+              server.getRpcAddresses(0).getHost(),
+              server.getRpcAddresses(0).getPort(),
+              replicaMetadataPB.getRole(), 
replicaMetadataPB.getDimensionLabel());
+          replicas.add(replica);
+        }
+
+        List<ServerInfo> servers = new ArrayList<>();
+        for (Client.ServerMetadataPB serverMetadataPB : 
tabletMetadata.getTabletServersList()) {
+          HostAndPort hostPort =
+              
ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
+          final InetAddress inetAddress = 
NetUtil.getInetAddress(hostPort.getHost());
+          ServerInfo serverInfo = new 
ServerInfo(serverMetadataPB.getUuid().toString(),
+              hostPort, inetAddress, serverMetadataPB.getLocation());
+          servers.add(serverInfo);
+        }
+
+        RemoteTablet remoteTablet = new RemoteTablet(table.getTableId(),
+            tabletMetadata.getTabletId(), partition, replicas, servers);
+
+        
tableLocationsCache.cacheTabletLocations(Collections.singletonList(remoteTablet),
+            partition.partitionKeyStart, 1, tabletMetadata.getTtlMillis());
+      }
+    }
 
+    if (message.hasAuthzToken()) {
+      client.asyncClient.getAuthzTokenCache().put(table.getTableId(), 
message.getAuthzToken());
+    }
+
+    KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
 
     builder.setProjectedColumnIndexes(
         computeProjectedColumnIndexesForScanner(message, table.getSchema()));
@@ -289,6 +343,25 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
     return builder.build();
   }
 
+  private static KuduTable getKuduTable(ScanTokenPB message,
+                                        KuduClient client) throws 
KuduException {
+    // Use the table metadata from the scan token if it exists,
+    // otherwise call OpenTable to get the metadata from the master.
+    if (message.hasTableMetadata()) {
+      Client.TableMetadataPB tableMetadata = message.getTableMetadata();
+      Schema schema = ProtobufHelper.pbToSchema(tableMetadata.getSchema());
+      PartitionSchema partitionSchema =
+          
ProtobufHelper.pbToPartitionSchema(tableMetadata.getPartitionSchema(), schema);
+      return new KuduTable(client.asyncClient, tableMetadata.getTableName(),
+          tableMetadata.getTableId(), schema, partitionSchema,
+          tableMetadata.getNumReplicas(), tableMetadata.getExtraConfigsMap());
+    } else if (message.hasTableId()) {
+      return client.openTableById(message.getTableId());
+    } else {
+      return client.openTable(message.getTableName());
+    }
+  }
+
   @Override
   public int compareTo(KuduScanToken other) {
     if (message.hasTableId() && other.message.hasTableId()) {
@@ -334,6 +407,9 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
     // By default, a scan token is created for each tablet to be scanned.
     private long splitSizeBytes = DEFAULT_SPLIT_SIZE_BYTES;
 
+    private boolean includeTableMetadata = true;
+    private boolean includeTabletMetadata = true;
+
     KuduScanTokenBuilder(AsyncKuduClient client, KuduTable table) {
       super(client, table);
       timeout = client.getDefaultOperationTimeoutMs();
@@ -361,6 +437,28 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
       return this;
     }
 
+    /**
+     * If the table metadata is included on the scan token a GetTableSchema
+     * RPC call to the master can be avoided when deserializing each scan token
+     * into a scanner.
+     * @param includeMetadata true, if table metadata should be included.
+     */
+    public KuduScanTokenBuilder includeTableMetadata(boolean includeMetadata) {
+      this.includeTableMetadata = includeMetadata;
+      return this;
+    }
+
+    /**
+     * If the tablet metadata is included on the scan token a GetTableLocations
+     * RPC call to the master can be avoided when scanning with a scanner 
constructed
+     * from a scan token.
+     * @param includeMetadata true, if tablet metadata should be included.
+     */
+    public KuduScanTokenBuilder includeTabletMetadata(boolean includeMetadata) 
{
+      this.includeTabletMetadata = includeMetadata;
+      return this;
+    }
+
     @Override
     public List<KuduScanToken> build() {
       if (lowerBoundPartitionKey.length != 0 ||
@@ -378,37 +476,80 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
 
       Client.ScanTokenPB.Builder proto = Client.ScanTokenPB.newBuilder();
 
-      proto.setTableId(table.getTableId());
-      proto.setTableName(table.getName());
+      if (includeTableMetadata) {
+        // Set the table metadata so that a call to the master is not needed 
when
+        // deserializing the token into a scanner.
+        Client.TableMetadataPB tableMetadataPB = 
Client.TableMetadataPB.newBuilder()
+            .setTableId(table.getTableId())
+            .setTableName(table.getName())
+            .setNumReplicas(table.getNumReplicas())
+            .setSchema(ProtobufHelper.schemaToPb(table.getSchema()))
+            
.setPartitionSchema(ProtobufHelper.partitionSchemaToPb(table.getPartitionSchema()))
+            .putAllExtraConfigs(table.getExtraConfig())
+            .build();
+        proto.setTableMetadata(tableMetadataPB);
+
+        // Only include the authz token if the table metadata is included.
+        // It is returned in the required GetTableSchema request otherwise.
+        Token.SignedTokenPB authzToken = 
client.getAuthzToken(table.getTableId());
+        if (authzToken != null) {
+          proto.setAuthzToken(authzToken);
+        }
+      } else {
+        // If we add the table metadata, we don't need to set the old table id
+        // and table name. It is expected that the creation and use of a scan 
token
+        // will be on the same or compatible versions.
+        proto.setTableId(table.getTableId());
+        proto.setTableName(table.getName());
+      }
 
       // Map the column names or indices to actual columns in the table schema.
       // If the user did not set either projection, then scan all columns.
       Schema schema = table.getSchema();
-      if (projectedColumnNames != null) {
-        for (String columnName : projectedColumnNames) {
-          ColumnSchema columnSchema = schema.getColumn(columnName);
-          Preconditions.checkArgument(columnSchema != null, "unknown column 
i%s", columnName);
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
-                                    schema.hasColumnIds() ? 
schema.getColumnId(columnName) : -1,
-                                    columnSchema);
-        }
-      } else if (projectedColumnIndexes != null) {
-        for (int columnIdx : projectedColumnIndexes) {
-          ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx);
-          Preconditions.checkArgument(columnSchema != null, "unknown column 
index %s", columnIdx);
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
-                                    schema.hasColumnIds() ?
-                                        
schema.getColumnId(columnSchema.getName()) :
-                                        -1,
-                                    columnSchema);
+      if (includeTableMetadata) {
+        // If the table metadata is included, then the column indexes can be
+        // used instead of duplicating the ColumnSchemaPBs in the serialized
+        // scan token.
+        if (projectedColumnNames != null) {
+          for (String columnName : projectedColumnNames) {
+            proto.addProjectedColumnIdx(schema.getColumnIndex(columnName));
+          }
+        } else if (projectedColumnIndexes != null) {
+          proto.addAllProjectedColumnIdx(projectedColumnIndexes);
+        } else {
+          List<Integer> indexes = IntStream.range(0, schema.getColumnCount())
+              .boxed().collect(Collectors.toList());
+          proto.addAllProjectedColumnIdx(indexes);
         }
       } else {
-        for (ColumnSchema column : schema.getColumns()) {
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
-                                    schema.hasColumnIds() ?
-                                        schema.getColumnId(column.getName()) :
-                                        -1,
-                                    column);
+        if (projectedColumnNames != null) {
+          for (String columnName : projectedColumnNames) {
+            ColumnSchema columnSchema = schema.getColumn(columnName);
+            Preconditions.checkArgument(columnSchema != null,
+                "unknown column i%s", columnName);
+            ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                schema.hasColumnIds() ? schema.getColumnId(columnName) : -1,
+                columnSchema);
+          }
+        } else if (projectedColumnIndexes != null) {
+          for (int columnIdx : projectedColumnIndexes) {
+            ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx);
+            Preconditions.checkArgument(columnSchema != null,
+                "unknown column index %s", columnIdx);
+            ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                schema.hasColumnIds() ?
+                    schema.getColumnId(columnSchema.getName()) :
+                    -1,
+                columnSchema);
+          }
+        } else {
+          for (ColumnSchema column : schema.getColumns()) {
+            ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                schema.hasColumnIds() ?
+                    schema.getColumnId(column.getName()) :
+                    -1,
+                column);
+          }
         }
       }
 
@@ -492,6 +633,61 @@ public class KuduScanToken implements 
Comparable<KuduScanToken> {
           if (primaryKeyEnd != null && primaryKeyEnd.length > 0) {
             
builder.setUpperBoundPrimaryKey(UnsafeByteOperations.unsafeWrap(primaryKeyEnd));
           }
+
+          LocatedTablet tablet = keyRange.getTablet();
+
+          // Set the tablet metadata so that a call to the master is not 
needed to
+          // locate the tablet to scan when opening the scanner.
+          if (includeTabletMetadata) {
+            TableLocationsCache.Entry entry = 
client.getTableLocationEntry(table.getTableId(),
+                tablet.getPartition().partitionKeyStart);
+            if (entry != null && !entry.isNonCoveredRange() && 
!entry.isStale()) {
+              RemoteTablet remoteTablet = entry.getTablet();
+
+              // Build the list of server metadata.
+              List<Client.ServerMetadataPB> servers = new ArrayList<>();
+              Map<HostAndPort, Integer> serverIndexMap = new HashMap<>();
+              List<ServerInfo> tabletServers = 
remoteTablet.getTabletServersCopy();
+              for (int i = 0; i < tabletServers.size(); i++) {
+                ServerInfo serverInfo = tabletServers.get(i);
+                Client.ServerMetadataPB serverMetadataPB =
+                    Client.ServerMetadataPB.newBuilder()
+                        .setUuid(ByteString.copyFromUtf8(serverInfo.getUuid()))
+                        .addRpcAddresses(
+                            
ProtobufHelper.hostAndPortToPB(serverInfo.getHostAndPort()))
+                        .setLocation(serverInfo.getLocation())
+                      .build();
+                servers.add(serverMetadataPB);
+                serverIndexMap.put(serverInfo.getHostAndPort(), i);
+              }
+
+              // Build the list of replica metadata.
+              List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new 
ArrayList<>();
+              for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) 
{
+                Integer serverIndex = serverIndexMap.get(
+                    new HostAndPort(replica.getRpcHost(), 
replica.getRpcPort()));
+                Client.TabletMetadataPB.ReplicaMetadataPB.Builder 
tabletMetadataBuilder =
+                    Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder()
+                        .setRole(replica.getRoleAsEnum())
+                        .setTsIdx(serverIndex);
+                if (replica.getDimensionLabel() != null) {
+                  
tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel());
+                }
+                replicas.add(tabletMetadataBuilder.build());
+              }
+
+              // Build the tablet metadata and add it to the token.
+              Client.TabletMetadataPB tabletMetadataPB = 
Client.TabletMetadataPB.newBuilder()
+                  .setTabletId(remoteTablet.getTabletId())
+                  
.setPartition(ProtobufHelper.partitionToPb(remoteTablet.getPartition()))
+                  .addAllReplicas(replicas)
+                  .addAllTabletServers(servers)
+                  .setTtlMillis(entry.ttl())
+                  .build();
+              builder.setTabletMetadata(tabletMetadataPB);
+            }
+          }
+
           tokens.add(new KuduScanToken(keyRange.getTablet(), builder.build()));
         }
         return tokens;
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 0d58bbc..ae5ec8f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -204,7 +204,7 @@ public class ProtobufHelper {
    * @param pb the partition schema protobuf message
    * @return a partition instance
    */
-  static PartitionSchema pbToPartitionSchema(Common.PartitionSchemaPB pb, 
Schema schema) {
+  public static PartitionSchema pbToPartitionSchema(Common.PartitionSchemaPB 
pb, Schema schema) {
     List<Integer> rangeColumns = pbToIds(pb.getRangeSchema().getColumnsList());
     PartitionSchema.RangeSchema rangeSchema = new 
PartitionSchema.RangeSchema(rangeColumns);
 
@@ -225,6 +225,28 @@ public class ProtobufHelper {
     return new PartitionSchema(rangeSchema, hashSchemas.build(), schema);
   }
 
+  public static Common.PartitionSchemaPB partitionSchemaToPb(PartitionSchema 
partitionSchema) {
+    Common.PartitionSchemaPB.Builder builder = 
Common.PartitionSchemaPB.newBuilder();
+
+    for (PartitionSchema.HashBucketSchema hashBucketSchema :
+        partitionSchema.getHashBucketSchemas()) {
+      Common.PartitionSchemaPB.HashBucketSchemaPB.Builder hbsBuilder =
+          Common.PartitionSchemaPB.HashBucketSchemaPB.newBuilder()
+                .addAllColumns(idsToPb(hashBucketSchema.getColumnIds()))
+                .setNumBuckets(hashBucketSchema.getNumBuckets())
+                .setSeed(hashBucketSchema.getSeed());
+      builder.addHashBucketSchemas(hbsBuilder.build());
+    }
+
+    Common.PartitionSchemaPB.RangeSchemaPB rangeSchemaPB =
+        Common.PartitionSchemaPB.RangeSchemaPB.newBuilder()
+              
.addAllColumns(idsToPb(partitionSchema.getRangeSchema().getColumnIds()))
+              .build();
+    builder.setRangeSchema(rangeSchemaPB);
+
+    return builder.build();
+  }
+
   /**
    * Constructs a new {@code Partition} instance from the a protobuf message.
    * @param pb the protobuf message
@@ -236,6 +258,14 @@ public class ProtobufHelper {
                          pb.getHashBucketsList());
   }
 
+  static Common.PartitionPB partitionToPb(Partition partition) {
+    return Common.PartitionPB.newBuilder()
+        
.setPartitionKeyStart(ByteString.copyFrom(partition.getPartitionKeyStart()))
+        
.setPartitionKeyEnd(ByteString.copyFrom(partition.getPartitionKeyEnd()))
+        .addAllHashBuckets(partition.getHashBuckets())
+        .build();
+  }
+
   /**
    * Deserializes a list of column identifier protobufs into a list of column 
IDs. This method
    * relies on the fact that the master will aways send a partition schema 
with column IDs, and not
@@ -265,6 +295,24 @@ public class ProtobufHelper {
     return columnIds.build();
   }
 
+  /**
+   * Serializes a list of column IDs into a list of column identifier 
protobufs.
+   *
+   * @param columnIds the column IDs
+   * @return the column identifiers
+   */
+  private static List<Common.PartitionSchemaPB.ColumnIdentifierPB> idsToPb(
+      List<Integer> columnIds) {
+    ImmutableList.Builder<Common.PartitionSchemaPB.ColumnIdentifierPB> 
columnIdentifiers =
+        ImmutableList.builder();
+    for (Integer id : columnIds) {
+      Common.PartitionSchemaPB.ColumnIdentifierPB columnIdentifierPB =
+          
Common.PartitionSchemaPB.ColumnIdentifierPB.newBuilder().setId(id).build();
+      columnIdentifiers.add(columnIdentifierPB);
+    }
+    return columnIdentifiers.build();
+  }
+
   private static byte[] objectToWireFormat(ColumnSchema col, Object value) {
     switch (col.getType()) {
       case BOOL:
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 77c6406..29f33d4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -282,6 +282,14 @@ public class RemoteTablet implements 
Comparable<RemoteTablet> {
     return tabletId.getBytes(UTF_8);
   }
 
+  List<ServerInfo> getTabletServersCopy() {
+    List<ServerInfo> results = new ArrayList<>();
+    synchronized (tabletServers) {
+      results.addAll(tabletServers.values());
+    }
+    return results;
+  }
+
   @Override
   public int compareTo(RemoteTablet remoteTablet) {
     if (remoteTablet == null) {
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
index 03d80dc..a138dbd 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -298,7 +298,7 @@ class TableLocationsCache {
       return tablet == null ? upperBoundPartitionKey : 
tablet.getPartition().getPartitionKeyEnd();
     }
 
-    private long ttl() {
+    long ttl() {
       return TimeUnit.NANOSECONDS.toMillis(deadline - ticker.read());
     }
 
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 80c7141..1c75992 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -27,6 +27,8 @@ import static 
org.apache.kudu.test.ClientTestUtil.loadDefaultTable;
 import static org.apache.kudu.test.MetricTestUtils.totalRequestCount;
 import static org.apache.kudu.test.MetricTestUtils.validateRequestCount;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -46,6 +48,7 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
 
 public class TestScanToken {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestKuduClient.class);
@@ -55,8 +58,15 @@ public class TestScanToken {
   private KuduClient client;
   private AsyncKuduClient asyncClient;
 
+  // Enable Kerberos and access control so we can validate the requests in 
secure environment.
+  // Specifically that authz tokens in the scan tokens work.
+  private static final MiniKuduCluster.MiniKuduClusterBuilder clusterBuilder =
+      KuduTestHarness.getBaseClusterBuilder()
+          .enableKerberos()
+          .addTabletServerFlag("--tserver_enforce_access_control=true");
+
   @Rule
-  public KuduTestHarness harness = new KuduTestHarness();
+  public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
 
   @Before
   public void setUp() {
@@ -177,9 +187,9 @@ public class TestScanToken {
     List<KuduScanToken> tokens = tokenBuilder.build();
     assertEquals(6, tokens.size());
     assertEquals('f' - 'a' + 'z' - 'h',
-                 countScanTokenRows(tokens,
-                     client.getMasterAddressesAsString(),
-                     client.getDefaultOperationTimeoutMs()));
+        countScanTokenRows(tokens,
+            client.getMasterAddressesAsString(),
+            client.getDefaultOperationTimeoutMs()));
 
     for (KuduScanToken token : tokens) {
       // Sanity check to make sure the debug printing does not throw.
@@ -205,9 +215,12 @@ public class TestScanToken {
     KuduTable table = client.openTable(testTableName);
 
     KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
-    List<KuduScanToken> tokens = tokenBuilder.build();
+    List<KuduScanToken> tokens = 
tokenBuilder.includeTableMetadata(false).build();
+    List<KuduScanToken> tokensWithMetadata = 
tokenBuilder.includeTableMetadata(true).build();
     assertEquals(1, tokens.size());
+    assertEquals(1, tokensWithMetadata.size());
     KuduScanToken token = tokens.get(0);
+    KuduScanToken tokenWithMetadata = tokensWithMetadata.get(0);
 
     // Drop a column
     client.alterTable(testTableName, new AlterTableOptions().dropColumn("a"));
@@ -217,6 +230,13 @@ public class TestScanToken {
     } catch (IllegalArgumentException e) {
       assertTrue(e.getMessage().contains("Unknown column"));
     }
+    try {
+      KuduScanner scanner = tokenWithMetadata.intoScanner(client);
+      countRowsInScan(scanner);
+      fail();
+    } catch (KuduException e) {
+      assertTrue(e.getMessage().contains("Some columns are not present in the 
current schema: a"));
+    }
 
     // Add a column with the same name, type, and nullability. It will have a 
different id-- it's a
     // different column-- so the scan token will fail.
@@ -251,6 +271,9 @@ public class TestScanToken {
     KuduTable table = client.openTable(testTableName);
 
     KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+    // TODO(KUDU-3146): Disable including the table metadata so the new column 
name is retrieved
+    //  when deserializing the scanner.
+    tokenBuilder.includeTableMetadata(false);
     List<KuduScanToken> tokens = tokenBuilder.build();
     assertEquals(1, tokens.size());
     KuduScanToken token = tokens.get(0);
@@ -261,7 +284,7 @@ public class TestScanToken {
 
     KuduScanner scanner = token.intoScanner(client);
 
-    // TODO(wdberkeley): Handle renaming a column between when the token is 
rehydrated as a scanner
+    // TODO(KUDU-3146): Handle renaming a column between when the token is 
rehydrated as a scanner
     //  and when the scanner first hits a replica. Note that this is almost 
certainly a very
     //  short period of vulnerability.
 
@@ -368,7 +391,9 @@ public class TestScanToken {
         scannedRows <= 2 * numRows / 3);
   }
 
-  /** Test that scanRequestTimeout makes it from the scan token to the 
underlying Scanner class. */
+  /**
+   * Test that scanRequestTimeout makes it from the scan token to the 
underlying Scanner class.
+   */
   @Test
   public void testScanRequestTimeout() throws IOException {
     final int NUM_ROWS_DESIRED = 100;
@@ -391,7 +416,7 @@ public class TestScanToken {
                                       KuduTable table,
                                       int numRows) throws Exception {
     KuduSession session = client.newSession();
-    for (int i = 0 ; i < numRows / 2; i++) {
+    for (int i = 0; i < numRows / 2; i++) {
       session.apply(createBasicSchemaInsert(table, i));
     }
 
@@ -429,7 +454,9 @@ public class TestScanToken {
     assertEquals(numExpectedDeletes, numDeletes);
   }
 
-  /** Test that scan tokens work with diff scans. */
+  /**
+   * Test that scan tokens work with diff scans.
+   */
   @Test
   public void testDiffScanTokens() throws Exception {
     Schema schema = getBasicSchema();
@@ -446,6 +473,9 @@ public class TestScanToken {
     // the last row inserted in the first group of ops, and increment the end 
timestamp to include
     // the last row deleted in the second group of ops.
     List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        // TODO(KUDU-3146): Disable including the table metadata so the new 
column name is
+        //  retrieved when deserializing the scanner.
+        .includeTableMetadata(false)
         .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1)
         .build();
     assertEquals(1, tokens.size());
@@ -453,7 +483,9 @@ public class TestScanToken {
     checkDiffScanResults(tokens.get(0).intoScanner(client), 3 * numRows / 4, 
numRows / 4);
   }
 
-  /** Test that scan tokens work with diff scans even when columns are 
renamed. */
+  /**
+   * Test that scan tokens work with diff scans even when columns are renamed.
+   */
   @Test
   public void testDiffScanTokensConcurrentColumnRename() throws Exception {
     Schema schema = getBasicSchema();
@@ -470,13 +502,16 @@ public class TestScanToken {
     // the last row inserted in the first group of ops, and increment the end 
timestamp to include
     // the last row deleted in the second group of ops.
     List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        // TODO(KUDU-3146): Disable including the table metadata so the new 
column name is
+        //  retrieved when deserializing the scanner.
+        .includeTableMetadata(false)
         .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1)
         .build();
     assertEquals(1, tokens.size());
 
     // Rename a column between when the token is created and when it is 
rehydrated into a scanner
     client.alterTable(table.getName(),
-                      new AlterTableOptions().renameColumn("column1_i", 
"column1_i_new"));
+        new AlterTableOptions().renameColumn("column1_i", "column1_i_new"));
 
     KuduScanner scanner = tokens.get(0).intoScanner(client);
 
@@ -488,7 +523,52 @@ public class TestScanToken {
   }
 
   @Test
-  public void testScanTokenRequests() throws Exception {
+  public void testScanTokenRequestsWithMetadata() throws Exception {
+    Schema schema = getBasicSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    // Use a new client to simulate hydrating in a new process.
+    KuduClient newClient =
+        new 
KuduClient.KuduClientBuilder(harness.getMasterAddressesAsString()).build();
+    newClient.getTablesList(); // List the tables to prevent counting 
initialization RPCs.
+    // Ensure the client doesn't have an authorization token for the table.
+    
assertNull(newClient.asyncClient.getAuthzTokenCache().get(table.getTableId()));
+
+    KuduMetrics.logMetrics(); // Log the metric values to help debug failures.
+    final long beforeRequests = totalRequestCount();
+
+    // Validate that building a scan token results in a single 
GetTableLocations request.
+    KuduScanToken token = validateRequestCount(1, client.getClientId(),
+        "GetTableLocations", () -> {
+          KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+          List<KuduScanToken> tokens = 
tokenBuilder.includeTableMetadata(true).build();
+          assertEquals(1, tokens.size());
+          return tokens.get(0);
+        });
+
+    // Validate that hydrating a token doesn't result in a request.
+    KuduScanner scanner = validateRequestCount(0, newClient.getClientId(),
+        () -> token.intoScanner(newClient));
+    // Ensure the client now has an authorization token.
+    
assertNotNull(newClient.asyncClient.getAuthzTokenCache().get(table.getTableId()));
+
+    // Validate that starting to scan results in a Scan request.
+    validateRequestCount(1, newClient.getClientId(), "Scan",
+        scanner::nextRows);
+
+    final long afterRequests = totalRequestCount();
+
+    // Validate no other unexpected requests were sent.
+    // GetTableLocations, Scan.
+    KuduMetrics.logMetrics(); // Log the metric values to help debug failures.
+    assertEquals(2, afterRequests - beforeRequests);
+  }
+
+  @Test
+  public void testScanTokenRequestsNoMetadata() throws Exception {
     Schema schema = getBasicSchema();
     CreateTableOptions createOptions = new CreateTableOptions();
     createOptions.setRangePartitionColumns(ImmutableList.of());
@@ -506,11 +586,14 @@ public class TestScanToken {
     // Validate that building a scan token results in a single 
GetTableLocations request.
     KuduScanToken token = validateRequestCount(1, client.getClientId(),
         "GetTableLocations", () -> {
-        KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
-        List<KuduScanToken> tokens = tokenBuilder.build();
-        assertEquals(1, tokens.size());
-        return tokens.get(0);
-      });
+          KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+          List<KuduScanToken> tokens = tokenBuilder
+              .includeTableMetadata(false)
+              .includeTabletMetadata(false)
+              .build();
+          assertEquals(1, tokens.size());
+          return tokens.get(0);
+        });
 
     // Validate that hydrating a token into a scanner results in a single 
GetTableSchema request.
     KuduScanner scanner = validateRequestCount(1, newClient.getClientId(), 
"GetTableSchema",
@@ -527,4 +610,56 @@ public class TestScanToken {
     KuduMetrics.logMetrics(); // Log the metric values to help debug failures.
     assertEquals(4, afterRequests - beforeRequests);
   }
+
+  @Test
+  public void testScanTokenSize() throws Exception {
+    List<ColumnSchema> columns = new ArrayList<>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", 
Type.INT8).key(true).build());
+    for (int i = 0; i < 100; i++) {
+      columns.add(new ColumnSchema.ColumnSchemaBuilder("int64-" + i, 
Type.INT64).build());
+    }
+    Schema schema = new Schema(columns);
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = 
client.newScanTokenBuilder(table);
+    List<KuduScanToken> tokens = tokenBuilder
+        .includeTabletMetadata(false)
+        .includeTableMetadata(false)
+        .build();
+    assertEquals(1, tokens.size());
+    final byte[] tokenBytes = tokens.get(0).serialize();
+
+    List<KuduScanToken> tokensWithTabletMetadata = tokenBuilder
+        .includeTabletMetadata(true)
+        .includeTableMetadata(false)
+        .build();
+    assertEquals(1, tokensWithTabletMetadata.size());
+    final byte[] tokenWithTabletMetadataBytes = 
tokensWithTabletMetadata.get(0).serialize();
+
+    List<KuduScanToken> tokensWithTableMetadata = tokenBuilder
+        .includeTabletMetadata(false)
+        .includeTableMetadata(true)
+        .build();
+    assertEquals(1, tokensWithTabletMetadata.size());
+    final byte[] tokenWithTableMetadataBytes = 
tokensWithTableMetadata.get(0).serialize();
+
+    List<KuduScanToken> tokensWithAllMetadata = tokenBuilder
+        .includeTabletMetadata(true)
+        .includeTableMetadata(true)
+        .build();
+    assertEquals(1, tokensWithAllMetadata.size());
+    final byte[] tokenWithAllMetadataBytes = 
tokensWithAllMetadata.get(0).serialize();
+
+    LOG.info("tokenBytes: " + tokenBytes.length);
+    LOG.info("tokenWithTabletMetadataBytes: " + 
tokenWithTabletMetadataBytes.length);
+    LOG.info("tokenWithTableMetadataBytes: " + 
tokenWithTableMetadataBytes.length);
+    LOG.info("tokenWithAllMetadataBytes: " + tokenWithAllMetadataBytes.length);
+
+    assertTrue(tokenWithAllMetadataBytes.length > 
tokenWithTableMetadataBytes.length);
+    assertTrue(tokenWithTableMetadataBytes.length > 
tokenWithTabletMetadataBytes.length);
+    assertTrue(tokenWithTabletMetadataBytes.length > tokenBytes.length);
+  }
 }
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 9bee9f6..499da90 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -70,6 +70,7 @@ class DefaultSource
   val KEEP_ALIVE_PERIOD_MS = "kudu.keepAlivePeriodMs"
   val SPLIT_SIZE_BYTES = "kudu.splitSizeBytes"
   val HANDLE_SCHEMA_DRIFT = "kudu.handleSchemaDrift"
+  val USE_DRIVER_METADATA = "kudu.useDriverMetadata"
 
   /**
    * A nice alias for the data source so that when specifying the format
@@ -183,7 +184,8 @@ class DefaultSource
     val keepAlivePeriodMs =
       
parameters.get(KEEP_ALIVE_PERIOD_MS).map(_.toLong).getOrElse(defaultKeepAlivePeriodMs)
     val splitSizeBytes = parameters.get(SPLIT_SIZE_BYTES).map(_.toLong)
-
+    val useDriverMetadata =
+      
parameters.get(USE_DRIVER_METADATA).map(_.toBoolean).getOrElse(defaultUseDriverMetadata)
     KuduReadOptions(
       batchSize,
       scanLocality,
@@ -191,7 +193,8 @@ class DefaultSource
       keepAlivePeriodMs,
       scanRequestTimeoutMs,
       /* socketReadTimeoutMs= */ None,
-      splitSizeBytes)
+      splitSizeBytes,
+      useDriverMetadata)
   }
 
   private def getWriteOptions(parameters: Map[String, String]): 
KuduWriteOptions = {
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 798f80c..1d9ac64 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -80,6 +80,9 @@ class KuduRDD private[kudu] (
       builder.setSplitSizeBytes(size)
     }
 
+    builder.includeTableMetadata(options.useDriverMetadata)
+    builder.includeTabletMetadata(options.useDriverMetadata)
+
     for (predicate <- predicates) {
       builder.addPredicate(predicate)
     }
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index 8796131..f0eae25 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -38,6 +38,9 @@ import org.apache.kudu.spark.kudu.KuduReadOptions._
  * @param splitSizeBytes Sets the target number of bytes per spark task. If 
set, tablet's
  *                       primary key range will be split to generate uniform 
task sizes instead of
  *                       the default of 1 task per tablet.
+ * @param useDriverMetadata If true, sends the table metadata from the driver 
to the tasks instead
+ *                          of relying on calls to the Kudu master for each 
task to get the current
+ *                          table metadata.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -48,11 +51,13 @@ case class KuduReadOptions(
     keepAlivePeriodMs: Long = defaultKeepAlivePeriodMs,
     scanRequestTimeoutMs: Option[Long] = None,
     socketReadTimeoutMs: Option[Long] = None,
-    splitSizeBytes: Option[Long] = None)
+    splitSizeBytes: Option[Long] = None,
+    useDriverMetadata: Boolean = defaultUseDriverMetadata)
 
 object KuduReadOptions {
   val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this 
setting?
   val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
   val defaultFaultTolerantScanner: Boolean = false
   val defaultKeepAlivePeriodMs: Long = 
AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
+  val defaultUseDriverMetadata: Boolean = true
 }
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index f456757..acce7f0 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -21,6 +21,7 @@ PROTOBUF_GENERATE_CPP(
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
   PROTO_FILES client.proto)
 set(CLIENT_PROTO_LIBS
+  consensus_metadata_proto
   kudu_common_proto
   token_proto)
 ADD_EXPORTABLE_LIBRARY(client_proto
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index ea35f71..e8ee412 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1911,6 +1911,16 @@ Status KuduScanTokenBuilder::SetTimeoutMillis(int 
millis) {
   return Status::OK();
 }
 
+Status KuduScanTokenBuilder::IncludeTableMetadata(bool include_metadata) {
+  data_->IncludeTableMetadata(include_metadata);
+  return Status::OK();
+}
+
+Status KuduScanTokenBuilder::IncludeTabletMetadata(bool include_metadata) {
+  data_->IncludeTabletMetadata(include_metadata);
+  return Status::OK();
+}
+
 Status KuduScanTokenBuilder::AddConjunctPredicate(KuduPredicate* pred) {
   return data_->mutable_configuration()->AddConjunctPredicate(pred);
 }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index d8cfb42..6b866bc 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1249,6 +1249,7 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
 
   friend class KuduClient;
   friend class KuduPartitioner;
+  friend class KuduScanToken;
 
   KuduTable(const sp::shared_ptr<KuduClient>& client,
             const std::string& name,
@@ -2658,6 +2659,24 @@ class KUDU_EXPORT KuduScanTokenBuilder {
   /// @copydoc KuduScanner::SetTimeoutMillis
   Status SetTimeoutMillis(int millis) WARN_UNUSED_RESULT;
 
+  /// If the table metadata is included on the scan token a GetTableSchema
+  /// RPC call to the master can be avoided when deserializing each scan token
+  /// into a scanner.
+  ///
+  /// @param [in] include_metadata
+  ///   true, if table metadata should be included.
+  /// @return Operation result status.
+  Status IncludeTableMetadata(bool include_metadata) WARN_UNUSED_RESULT;
+
+  /// If the tablet metadata is included on the scan token a GetTableLocations
+  /// RPC call to the master can be avoided when scanning with a scanner 
constructed
+  /// from a scan token.
+  ///
+  /// @param [in] include_metadata
+  ///   true, if table metadata should be included.
+  /// @return Operation result status.
+  Status IncludeTabletMetadata(bool include_metadata) WARN_UNUSED_RESULT;
+
   /// Build the set of scan tokens.
   ///
   /// The builder may be reused after this call.
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index bd72baf..bad0fe3 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -22,9 +22,66 @@ package kudu.client;
 option java_package = "org.apache.kudu.client";
 
 import "kudu/common/common.proto";
+import "kudu/consensus/metadata.proto";
 import "kudu/security/token.proto";
 import "kudu/util/pb_util.proto";
 
+// All of the metadata required to create a KuduTable object in the C++
+// and Java clients.
+// TODO(KUDU-3135): Use for generic metadata tokens outside of the scan token.
+message TableMetadataPB {
+  optional string table_id = 1;
+  optional string table_name = 2;
+  optional int32 num_replicas = 3;
+
+  optional SchemaPB schema = 4;
+  optional PartitionSchemaPB partition_schema = 5;
+
+  // The table's extra configuration properties.
+  map<string, string> extra_configs = 6;
+}
+
+// Metdata about a single server.
+// This can be used on the client to update the local cache of where
+// each server is located.
+message ServerMetadataPB {
+  optional bytes uuid = 1;
+
+  repeated HostPortPB rpc_addresses = 2;
+
+  // The location assignment for this server.
+  optional string location = 3;
+}
+
+// All of the metadata required to create a RemoteTablet object in the C++
+// and Java clients.
+message TabletMetadataPB {
+
+  optional string tablet_id = 1;
+
+  optional PartitionPB partition = 2;
+
+  // The servers that host the replicas.
+  repeated ServerMetadataPB tablet_servers = 3;
+
+  message ReplicaMetadataPB {
+      // The index of the tablet server in tablet_servers.
+      required uint32 ts_idx = 1;
+      required consensus.RaftPeerPB.Role role = 2;
+      optional string dimension_label = 3;
+  }
+
+  repeated ReplicaMetadataPB replicas = 4;
+
+  // Cached table locations should not live longer than this timeout.
+  // Note: Some time will pass between serializing and deserializing
+  // the token. This means that that effective TTL would be longer
+  // than initially set. This shouldn't be a problem in normal usage
+  // given the time shouldn't be an extended period. Additionally,
+  // the client can retry if any failures occur due to outdated metadata.
+  optional uint64 ttl_millis = 5;
+}
+
 // Serialization format for client scan tokens. Scan tokens are serializable
 // scan descriptors that are used by query engines to plan a set of 
parallizable
 // scanners that are executed on remote task runners. The scan token protobuf
@@ -49,6 +106,20 @@ message ScanTokenPB {
   optional string table_id = 20;
   optional string table_name = 2;
 
+  // Optional table metadata. If provided the scan token can avoid
+  // an extra GetTableSchema RPC call to the master.
+  // If set, neither table_id or table_name above are required.
+  optional TableMetadataPB table_metadata = 21;
+
+  // Optional tablet metadata. If provided the scan token can avoid
+  // an extra GetTableLocations RPC call to the master.
+  optional TabletMetadataPB tablet_metadata = 22;
+
+  // The index of which columns in table_metadata to select.
+  // This can be used in place of projected_columns if this
+  // scan token has table_metadata.
+  repeated int32 projected_column_idx = 23;
+
   // Which columns to select.
   // if this is an empty list, no data will be returned, but the num_rows
   // field of the returned RowBlock will indicate how many rows passed
@@ -119,6 +190,9 @@ message ScanTokenPB {
   // The period, in milliseconds, at which to send keep-alive requests to the 
tablet
   // server to ensure this scanner won't time out.
   optional int64 keep_alive_period_ms = 18;
+
+  // An authorization token with which to authorize the scan requests.
+  optional security.SignedTokenPB authz_token = 24;
 }
 
 // All of the data necessary to authenticate to a cluster from a client with
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 5fc227d..4d24729 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -806,26 +806,39 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
   VLOG(2) << "Processing master response for " << rpc.ToString()
           << ". Response: " << pb_util::SecureShortDebugString(rpc.resp());
 
-  MonoTime expiration_time = MonoTime::Now() +
-      MonoDelta::FromMilliseconds(rpc.resp().ttl_millis());
-
-  std::lock_guard<percpu_rwlock> l(lock_);
-  TabletMap& tablets_by_key = LookupOrInsert(&tablets_by_table_and_key_,
-                                             rpc.table_id(), TabletMap());
-
-  const auto& tablet_locations = rpc.resp().tablet_locations();
-
-  if (tablet_locations.empty()) {
+  if (rpc.resp().tablet_locations().empty()) {
     // If there are no tablets in the response, then the table is empty. If
     // there were any tablets in the table they would have been returned, since
     // the master guarantees that if the partition key falls in a non-covered
     // range, the previous tablet will be returned, and we did not set an upper
     // bound partition key on the request.
     DCHECK(!rpc.req().has_partition_key_end());
+  }
+
+  return ProcessGetTableLocationsResponse(rpc.table(), rpc.partition_key(), 
rpc.is_exact_lookup(),
+      rpc.resp(), cache_entry, max_returned_locations);
 
+}
+
+Status MetaCache::ProcessGetTableLocationsResponse(const KuduTable* table,
+                                                   const string& partition_key,
+                                                   bool is_exact_lookup,
+                                                   const 
GetTableLocationsResponsePB& resp,
+                                                   MetaCacheEntry* cache_entry,
+                                                   int max_returned_locations) 
{
+  MonoTime expiration_time = MonoTime::Now() +
+      MonoDelta::FromMilliseconds(resp.ttl_millis());
+
+  std::lock_guard<percpu_rwlock> l(lock_);
+  TabletMap& tablets_by_key = LookupOrInsert(&tablets_by_table_and_key_,
+                                             table->id(), TabletMap());
+
+  const auto& tablet_locations = resp.tablet_locations();
+
+  if (tablet_locations.empty()) {
     tablets_by_key.clear();
     MetaCacheEntry entry(expiration_time, "", "");
-    VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << 
entry.DebugString(rpc.table());
+    VLOG(3) << "Caching '" << table->name() << "' entry " << 
entry.DebugString(table);
     InsertOrDie(&tablets_by_key, "", entry);
   } else {
     // First, update the tserver cache, needed for the Refresh calls below.
@@ -837,7 +850,7 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
     }
     // In the case of "interned" replicas, the above 'deprecated_replicas' 
lists will be empty
     // and instead we'll need to update from the top-level list of tservers.
-    const auto& ts_infos = rpc.resp().ts_infos();
+    const auto& ts_infos = resp.ts_infos();
     for (const TSInfoPB& ts_info : ts_infos) {
       UpdateTabletServer(ts_info);
     }
@@ -858,14 +871,14 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
     // in A.
 
     const auto& first_lower_bound = 
tablet_locations.Get(0).partition().partition_key_start();
-    if (rpc.partition_key() < first_lower_bound) {
+    if (partition_key < first_lower_bound) {
       // If the first tablet is past the requested partition key, then the
       // partition key falls in an initial non-covered range, such as A.
 
       // Clear any existing entries which overlap with the discovered 
non-covered range.
       tablets_by_key.erase(tablets_by_key.begin(), 
tablets_by_key.lower_bound(first_lower_bound));
       MetaCacheEntry entry(expiration_time, "", first_lower_bound);
-      VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << 
entry.DebugString(rpc.table());
+      VLOG(3) << "Caching '" << table->name() << "' entry " << 
entry.DebugString(table);
       InsertOrDie(&tablets_by_key, "", entry);
     }
 
@@ -885,7 +898,7 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
                              tablets_by_key.lower_bound(tablet_lower_bound));
 
         MetaCacheEntry entry(expiration_time, last_upper_bound, 
tablet_lower_bound);
-        VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << 
entry.DebugString(rpc.table());
+        VLOG(3) << "Caching '" << table->name() << "' entry " << 
entry.DebugString(table);
         InsertOrDie(&tablets_by_key, last_upper_bound, entry);
       }
       last_upper_bound = tablet_upper_bound;
@@ -928,7 +941,7 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
                                        tablet_id));
 
       MetaCacheEntry entry(expiration_time, remote);
-      VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << 
entry.DebugString(rpc.table());
+      VLOG(3) << "Caching '" << table->name() << "' entry " << 
entry.DebugString(table);
 
       InsertOrDie(&tablets_by_id_, tablet_id, remote);
       InsertOrDie(&tablets_by_key, tablet_lower_bound, entry);
@@ -943,14 +956,14 @@ Status MetaCache::ProcessLookupResponse(const LookupRpc& 
rpc,
                            tablets_by_key.end());
 
       MetaCacheEntry entry(expiration_time, last_upper_bound, "");
-      VLOG(3) << "Caching '" << rpc.table_name() << "' entry " << 
entry.DebugString(rpc.table());
+      VLOG(3) << "Caching '" << table->name() << "' entry " << 
entry.DebugString(table);
       InsertOrDie(&tablets_by_key, last_upper_bound, entry);
     }
   }
 
   // Finally, lookup the discovered entry and return it to the requestor.
-  *cache_entry = FindFloorOrDie(tablets_by_key, rpc.partition_key());
-  if (!rpc.is_exact_lookup() && cache_entry->is_non_covered_range() &&
+  *cache_entry = FindFloorOrDie(tablets_by_key, partition_key);
+  if (!is_exact_lookup && cache_entry->is_non_covered_range() &&
       !cache_entry->upper_bound_partition_key().empty()) {
     *cache_entry = FindFloorOrDie(tablets_by_key, 
cache_entry->upper_bound_partition_key());
     DCHECK(!cache_entry->is_non_covered_range());
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index f02305f..41e608f 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -56,6 +56,7 @@ class TabletServerServiceProxy;
 namespace master {
 class TSInfoPB;
 class TabletLocationsPB;
+class GetTableLocationsResponsePB;
 } // namespace master
 
 namespace client {
@@ -337,6 +338,10 @@ class MetaCacheEntry {
     }
   }
 
+  const MonoTime& expiration_time() const {
+    return expiration_time_;
+  }
+
   void refresh_expiration_time(MonoTime expiration_time) {
     DCHECK(Initialized());
     DCHECK(expiration_time.Initialized());
@@ -415,6 +420,19 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
                          scoped_refptr<RemoteTablet>* remote_tablet,
                          const StatusCallback& callback);
 
+  // Lookup the given tablet by key, only consulting local information.
+  // Returns true and sets *entry if successful.
+  bool LookupEntryByKeyFastPath(const KuduTable* table,
+                                const std::string& partition_key,
+                                MetaCacheEntry* entry);
+
+  Status ProcessGetTableLocationsResponse(const KuduTable* table,
+                                          const std::string& partition_key,
+                                          bool is_exact_lookup,
+                                          const 
master::GetTableLocationsResponsePB& resp,
+                                          MetaCacheEntry* cache_entry,
+                                          int max_returned_locations);
+
   // Clears the non-covered range entries from a table's meta cache.
   void ClearNonCoveredRangeEntries(const std::string& table_id);
 
@@ -448,12 +466,6 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
                                MetaCacheEntry* cache_entry,
                                int max_returned_locations);
 
-  // Lookup the given tablet by key, only consulting local information.
-  // Returns true and sets *entry if successful.
-  bool LookupEntryByKeyFastPath(const KuduTable* table,
-                                const std::string& partition_key,
-                                MetaCacheEntry* entry);
-
   // Perform the complete fast-path lookup. Returns:
   //  - NotFound if the lookup hits a non-covering range.
   //  - Incomplete if the fast path was not possible
diff --git a/src/kudu/client/scan_token-internal.cc 
b/src/kudu/client/scan_token-internal.cc
index 477ee8b..c8d80c1 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -18,6 +18,7 @@
 #include "kudu/client/scan_token-internal.h"
 
 #include <cstdint>
+#include <map>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -27,6 +28,8 @@
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
+#include <google/protobuf/stubs/common.h>
+#include <google/protobuf/stubs/port.h>
 
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client.h"
@@ -51,6 +54,7 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
+#include "kudu/security/token.pb.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -58,17 +62,25 @@
 #include "kudu/util/status.h"
 
 using std::string;
+using std::map;
 using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
 namespace kudu {
 
+using master::GetTableLocationsResponsePB;
 using master::TableIdentifierPB;
+using master::TabletLocationsPB;
+using master::TSInfoPB;
+using security::SignedTokenPB;
 
 namespace client {
 
 using internal::MetaCache;
+using internal::MetaCacheEntry;
+using internal::RemoteReplica;
+using internal::RemoteTabletServer;
 
 KuduScanToken::Data::Data(KuduTable* table,
                           ScanTokenPB message,
@@ -109,40 +121,112 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* 
client,
     }
   }
 
-  TableIdentifierPB table_identifier;
-  if (message.has_table_id()) {
-    table_identifier.set_table_id(message.table_id());
+  // Use the table metadata from the scan token if it exists,
+  // otherwise call OpenTable to get the metadata from the master.
+  sp::shared_ptr<KuduTable> table;
+  if (message.has_table_metadata()) {
+    const TableMetadataPB& metadata = message.table_metadata();
+    Schema schema;
+    RETURN_NOT_OK(SchemaFromPB(metadata.schema(), &schema));
+    KuduSchema kudu_schema(schema);
+    PartitionSchema partition_schema;
+    RETURN_NOT_OK(PartitionSchema::FromPB(metadata.partition_schema(), schema,
+        &partition_schema));
+    map<string, string> extra_configs(metadata.extra_configs().begin(),
+        metadata.extra_configs().end());
+    table.reset(new KuduTable(client->shared_from_this(), 
metadata.table_name(),
+        metadata.table_id(), metadata.num_replicas(), kudu_schema, 
partition_schema,
+        extra_configs));
+  } else {
+    TableIdentifierPB table_identifier;
+    if (message.has_table_id()) {
+      table_identifier.set_table_id(message.table_id());
+    }
+    if (message.has_table_name()) {
+      table_identifier.set_table_name(message.table_name());
+    }
+    RETURN_NOT_OK(client->data_->OpenTable(client, table_identifier, &table));
+  }
+
+  // Prime the client tablet location cache if no entry is already present.
+  if (message.has_tablet_metadata()) {
+    const TabletMetadataPB& tablet_metadata = message.tablet_metadata();
+    Partition partition;
+    Partition::FromPB(tablet_metadata.partition(), &partition);
+    MetaCacheEntry entry;
+    if (!client->data_->meta_cache_->LookupEntryByKeyFastPath(table.get(),
+        partition.partition_key_start(), &entry)) {
+      // Generate a fake GetTableLocationsResponsePB to pass to the client
+      // meta cache in order to "inject" the tablet metadata into the client.
+      GetTableLocationsResponsePB mock_resp;
+      mock_resp.set_ttl_millis(tablet_metadata.ttl_millis());
+
+      // Populate the locations.
+      TabletLocationsPB locations_pb;
+      locations_pb.set_tablet_id(tablet_metadata.tablet_id());
+      PartitionPB partition_pb;
+      partition.ToPB(&partition_pb);
+      *locations_pb.mutable_partition() = std::move(partition_pb);
+      for (const TabletMetadataPB::ReplicaMetadataPB& replica_meta : 
tablet_metadata.replicas()) {
+        TabletLocationsPB::InternedReplicaPB replica_pb;
+        replica_pb.set_ts_info_idx(replica_meta.ts_idx());
+        replica_pb.set_role(replica_meta.role());
+        if (replica_meta.has_dimension_label()) {
+          replica_pb.set_dimension_label(replica_meta.dimension_label());
+        }
+        *locations_pb.add_interned_replicas() = std::move(replica_pb);
+      }
+      *mock_resp.add_tablet_locations() = std::move(locations_pb);
+
+      // Populate the servers.
+      for (const ServerMetadataPB& server_meta : 
tablet_metadata.tablet_servers()) {
+        TSInfoPB server_pb;
+        server_pb.set_permanent_uuid(server_meta.uuid());
+        server_pb.set_location(server_meta.location());
+        for (const HostPortPB& host_port :server_meta.rpc_addresses()) {
+          *server_pb.add_rpc_addresses() = host_port;
+        }
+        *mock_resp.add_ts_infos() = std::move(server_pb);
+      }
+
+      client->data_->meta_cache_->ProcessGetTableLocationsResponse(
+          table.get(), partition.partition_key_start(), true, mock_resp, 
&entry, 1);
+    }
   }
-  if (message.has_table_name()) {
-    table_identifier.set_table_name(message.table_name());
+
+  if (message.has_authz_token()) {
+    client->data_->StoreAuthzToken(table->id(), message.authz_token());
   }
-  sp::shared_ptr<KuduTable> table;
-  RETURN_NOT_OK(client->data_->OpenTable(client,
-                                         table_identifier,
-                                         &table));
+
   Schema* schema = table->schema().schema_;
 
   unique_ptr<KuduScanner> scan_builder(new KuduScanner(table.get()));
 
   vector<int> column_indexes;
-  for (const ColumnSchemaPB& column : message.projected_columns()) {
-    int column_idx = schema->find_column(column.name());
-    if (column_idx == Schema::kColumnNotFound) {
-      return Status::IllegalState("unknown column in scan token", 
column.name());
+  if (!message.projected_column_idx().empty()) {
+    for (const int column_idx : message.projected_column_idx()) {
+      column_indexes.push_back(column_idx);
     }
-    DataType expected_type = schema->column(column_idx).type_info()->type();
-    if (column.type() != expected_type) {
-      return Status::IllegalState(Substitute(
+  } else {
+    for (const ColumnSchemaPB& column : message.projected_columns()) {
+      int column_idx = schema->find_column(column.name());
+      if (column_idx == Schema::kColumnNotFound) {
+        return Status::IllegalState("unknown column in scan token", 
column.name());
+      }
+      DataType expected_type = schema->column(column_idx).type_info()->type();
+      if (column.type() != expected_type) {
+        return Status::IllegalState(Substitute(
             "invalid type $0 for column '$1' in scan token, expected: $2",
             DataType_Name(column.type()), column.name(), 
DataType_Name(expected_type)));
-    }
-    bool expected_is_nullable = schema->column(column_idx).is_nullable();
-    if (column.is_nullable() != expected_is_nullable) {
-      return Status::IllegalState(Substitute(
+      }
+      bool expected_is_nullable = schema->column(column_idx).is_nullable();
+      if (column.is_nullable() != expected_is_nullable) {
+        return Status::IllegalState(Substitute(
             "invalid nullability for column '$0' in scan token, expected: $1",
             column.name(), expected_is_nullable ? "NULLABLE" : "NOT NULL"));
+      }
+      column_indexes.push_back(column_idx);
     }
-    column_indexes.push_back(column_idx);
   }
   RETURN_NOT_OK(scan_builder->SetProjectedColumnIndexes(column_indexes));
 
@@ -245,10 +329,48 @@ Status 
KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
 
   ScanTokenPB pb;
 
-  pb.set_table_id(table->id());
-  pb.set_table_name(table->name());
-  RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), 
pb.mutable_projected_columns(),
-                                  SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | 
SCHEMA_PB_WITHOUT_IDS));
+  if (include_table_metadata_) {
+    // Set the table metadata so that a call to the master is not needed when
+    // deserializing the token into a scanner.
+    TableMetadataPB table_pb;
+    table_pb.set_table_id(table->id());
+    table_pb.set_table_name(table->name());
+    table_pb.set_num_replicas(table->num_replicas());
+    SchemaPB schema_pb;
+    RETURN_NOT_OK(SchemaToPB(KuduSchema::ToSchema(table->schema()), 
&schema_pb));
+    *table_pb.mutable_schema() = std::move(schema_pb);
+    PartitionSchemaPB partition_schema_pb;
+    table->partition_schema().ToPB(&partition_schema_pb);
+    table_pb.mutable_partition_schema()->CopyFrom(partition_schema_pb);
+    table_pb.mutable_extra_configs()->insert(table->extra_configs().begin(),
+                                             table->extra_configs().end());
+    *pb.mutable_table_metadata() = std::move(table_pb);
+
+    // Only include the authz token if the table metadata is included.
+    // It is returned in the required GetTableSchema request otherwise.
+    SignedTokenPB authz_token;
+    bool found_authz_token = client->data_->FetchCachedAuthzToken(table->id(), 
&authz_token);
+    if (found_authz_token) {
+      *pb.mutable_authz_token() = std::move(authz_token);
+    }
+  } else {
+    // If we add the table metadata, we don't need to set the old table id
+    // and table name. It is expected that the creation and use of a scan token
+    // will be on the same or compatible versions.
+    pb.set_table_id(table->id());
+    pb.set_table_name(table->name());
+  }
+
+  if (include_table_metadata_) {
+    for (const ColumnSchema& col : configuration_.projection()->columns()) {
+      int column_idx;
+      table->schema().schema_->FindColumn(col.name(), &column_idx);
+      pb.mutable_projected_column_idx()->Add(column_idx);
+    }
+  } else {
+    RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), 
pb.mutable_projected_columns(),
+        SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
+  }
 
   if (configuration_.spec().lower_bound_key()) {
     pb.mutable_lower_bound_primary_key()->assign(
@@ -378,6 +500,58 @@ Status 
KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
         tablet->partition().partition_key_start());
     message.set_upper_bound_partition_key(
         tablet->partition().partition_key_end());
+
+    // Set the tablet metadata so that a call to the master is not needed to
+    // locate the tablet to scan when opening the scanner.
+    if (include_tablet_metadata_) {
+      internal::MetaCacheEntry entry;
+      if (client->data_->meta_cache_->LookupEntryByKeyFastPath(table,
+          tablet->partition().partition_key_start(), &entry)) {
+        if (!entry.is_non_covered_range() && !entry.stale()) {
+          TabletMetadataPB tablet_pb;
+          tablet_pb.set_tablet_id(entry.tablet()->tablet_id());
+          PartitionPB partition_pb;
+          entry.tablet()->partition().ToPB(&partition_pb);
+          *tablet_pb.mutable_partition() = std::move(partition_pb);
+          MonoDelta ttl = entry.expiration_time() - MonoTime::Now();
+          tablet_pb.set_ttl_millis(ttl.ToMilliseconds());
+
+          // Build the list of server metadata.
+          vector<RemoteTabletServer*> servers;
+          map<string, int> server_index_map;
+          entry.tablet()->GetRemoteTabletServers(&servers);
+          for (int i = 0; i < servers.size(); i++) {
+            RemoteTabletServer* server = servers[i];
+            ServerMetadataPB server_pb;
+            server_pb.set_uuid(server->permanent_uuid());
+            server_pb.set_location(server->location());
+            vector<HostPort> host_ports;
+            server->GetHostPorts(&host_ports);
+            for (const HostPort& host_port : host_ports) {
+              *server_pb.add_rpc_addresses() = HostPortToPB(host_port);
+              server_index_map[host_port.ToString()] = i;
+            }
+            *tablet_pb.add_tablet_servers() = std::move(server_pb);
+          }
+
+          // Build the list of replica metadata.
+          vector<RemoteReplica> replicas;
+          entry.tablet()->GetRemoteReplicas(&replicas);
+          for (const RemoteReplica& replica : replicas) {
+            vector<HostPort> host_ports;
+            replica.ts->GetHostPorts(&host_ports);
+            int server_index = server_index_map[host_ports[0].ToString()];
+            TabletMetadataPB::ReplicaMetadataPB replica_pb;
+            replica_pb.set_role(replica.role);
+            replica_pb.set_ts_idx(server_index);
+            *tablet_pb.add_replicas() = std::move(replica_pb);
+          }
+
+          *message.mutable_tablet_metadata() = std::move(tablet_pb);
+        }
+      }
+    }
+
     unique_ptr<KuduScanToken> client_scan_token(new KuduScanToken);
     client_scan_token->data_ =
         new KuduScanToken::Data(table,
diff --git a/src/kudu/client/scan_token-internal.h 
b/src/kudu/client/scan_token-internal.h
index 84b7778..6296da0 100644
--- a/src/kudu/client/scan_token-internal.h
+++ b/src/kudu/client/scan_token-internal.h
@@ -72,8 +72,18 @@ class KuduScanTokenBuilder::Data {
     return &configuration_;
   }
 
- private:
+  void IncludeTableMetadata(bool include_metadata) {
+    include_table_metadata_ = include_metadata;
+  }
+
+  void IncludeTabletMetadata(bool include_metadata) {
+    include_tablet_metadata_ = include_metadata;
+  }
+
+private:
   ScanConfiguration configuration_;
+  bool include_table_metadata_ = true;
+  bool include_tablet_metadata_ = true;
 };
 
 } // namespace client
diff --git a/src/kudu/client/scan_token-test.cc 
b/src/kudu/client/scan_token-test.cc
index 9d18bbd..da1753c 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -25,6 +25,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -42,17 +43,26 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/master/master.h"
+#include "kudu/master/mini_master.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(tserver_enforce_access_control);
+
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
+
 namespace kudu {
 namespace client {
 
@@ -72,6 +82,10 @@ class ScanTokenTest : public KuduTest {
  protected:
 
   void SetUp() override {
+    // Enable access control so we can validate the requests in secure 
environment.
+    // Specifically that authz tokens in the scan tokens work.
+    FLAGS_tserver_enforce_access_control = true;
+
     // Set up the mini cluster
     cluster_.reset(new InternalMiniCluster(env_, 
InternalMiniClusterOptions()));
     ASSERT_OK(cluster_->Start());
@@ -139,6 +153,31 @@ class ScanTokenTest : public KuduTest {
     ASSERT_EQ(tokens.size(), tablet_ids.size());
   }
 
+  static Status IntoUniqueScanner(KuduClient* client,
+                                  const KuduScanToken& token,
+                                  unique_ptr<KuduScanner>* scanner_ptr) {
+    string serialized_token;
+    CHECK_OK(token.Serialize(&serialized_token));
+    KuduScanner* scanner_ptr_raw;
+    RETURN_NOT_OK(KuduScanToken::DeserializeIntoScanner(client,
+                                                        serialized_token,
+                                                        &scanner_ptr_raw));
+    scanner_ptr->reset(scanner_ptr_raw);
+    return Status::OK();
+  }
+
+  uint64_t NumGetTableSchemaRequests() const {
+    const auto& ent = cluster_->mini_master()->master()->metric_entity();
+    return METRIC_handler_latency_kudu_master_MasterService_GetTableSchema
+        .Instantiate(ent)->TotalCount();
+  }
+
+  uint64_t NumGetTableLocationsRequests() const {
+    const auto& ent = cluster_->mini_master()->master()->metric_entity();
+    return METRIC_handler_latency_kudu_master_MasterService_GetTableLocations
+        .Instantiate(ent)->TotalCount();
+  }
+
   shared_ptr<KuduClient> client_;
   unique_ptr<InternalMiniCluster> cluster_;
 };
@@ -193,9 +232,8 @@ TEST_F(ScanTokenTest, TestScanTokens) {
     ASSERT_OK(builder.Build(&tokens));
 
     ASSERT_EQ(8, tokens.size());
-    KuduScanner* scanner_ptr;
-    ASSERT_OK(tokens[0]->IntoKuduScanner(&scanner_ptr));
-    unique_ptr<KuduScanner> scanner(scanner_ptr);
+    unique_ptr<KuduScanner> scanner;
+    ASSERT_OK(IntoUniqueScanner(client_.get(), *tokens[0], &scanner));
     ASSERT_OK(scanner->Open());
     ASSERT_EQ(0, scanner->data_->last_response_.data().num_rows());
   }
@@ -232,6 +270,18 @@ TEST_F(ScanTokenTest, TestScanTokens) {
     NO_FATALS(VerifyTabletInfo(tokens));
   }
 
+  { // disable table metadata
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.IncludeTableMetadata(false));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(8, tokens.size());
+    ASSERT_EQ(200, CountRows(tokens));
+    NO_FATALS(VerifyTabletInfo(tokens));
+  }
+
   { // range predicate
     vector<KuduScanToken*> tokens;
     ElementDeleter deleter(&tokens);
@@ -585,9 +635,16 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
   }
 
   vector<KuduScanToken*> tokens;
-  ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+  vector<KuduScanToken*> tokens_with_metadata;
+  KuduScanTokenBuilder builder(table.get());
+  ASSERT_OK(builder.IncludeTableMetadata(false));
+  ASSERT_OK(builder.Build(&tokens));
+  ASSERT_OK(builder.IncludeTableMetadata(true));
+  ASSERT_OK(builder.Build(&tokens_with_metadata));
   ASSERT_EQ(1, tokens.size());
+  ASSERT_EQ(1, tokens_with_metadata.size());
   unique_ptr<KuduScanToken> token(tokens[0]);
+  unique_ptr<KuduScanToken> token_with_metadata(tokens_with_metadata[0]);
 
   // Drop a column.
   {
@@ -596,10 +653,16 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
     ASSERT_OK(table_alterer->Alter());
   }
 
-  KuduScanner* scanner_ptr;
-  Status s = token->IntoKuduScanner(&scanner_ptr);
+  unique_ptr<KuduScanner> scanner;
+  Status s = IntoUniqueScanner(client_.get(), *token, &scanner);
   ASSERT_EQ("Illegal state: unknown column in scan token: a", s.ToString());
 
+  unique_ptr<KuduScanner> scanner_with_metadata;
+  ASSERT_OK(IntoUniqueScanner(client_.get(), *token_with_metadata, 
&scanner_with_metadata));
+  s = scanner_with_metadata->Open();
+  ASSERT_EQ("Invalid argument: Some columns are not present in the current 
schema: a",
+      s.ToString());
+
   // Add back the column with the wrong type.
   {
     unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableName));
@@ -607,7 +670,7 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
     ASSERT_OK(table_alterer->Alter());
   }
 
-  s = token->IntoKuduScanner(&scanner_ptr);
+  s = IntoUniqueScanner(client_.get(), *token, &scanner);
   ASSERT_EQ("Illegal state: invalid type INT64 for column 'a' in scan token, 
expected: STRING",
             s.ToString());
 
@@ -619,7 +682,7 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
     ASSERT_OK(table_alterer->Alter());
   }
 
-  s = token->IntoKuduScanner(&scanner_ptr);
+  s = IntoUniqueScanner(client_.get(), *token, &scanner);
   ASSERT_EQ("Illegal state: invalid nullability for column 'a' in scan token, 
expected: NULLABLE",
             s.ToString());
 
@@ -634,8 +697,7 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
     ASSERT_OK(table_alterer->Alter());
   }
 
-  ASSERT_OK(token->IntoKuduScanner(&scanner_ptr));
-  delete scanner_ptr;
+  ASSERT_OK(IntoUniqueScanner(client_.get(), *token, &scanner));
 }
 
 // Tests the results of creating scan tokens, renaming the table being
@@ -675,14 +737,121 @@ TEST_F(ScanTokenTest, TestConcurrentRenameTable) {
     ASSERT_OK(table_alterer->Alter());
   }
 
-  KuduScanner* scanner_ptr;
-  ASSERT_OK(token->IntoKuduScanner(&scanner_ptr));
+  unique_ptr<KuduScanner> scanner;
+  ASSERT_OK(IntoUniqueScanner(client_.get(), *token, &scanner));
+
   size_t row_count;
-  ASSERT_OK(CountRowsWithRetries(scanner_ptr, &row_count));
+  ASSERT_OK(CountRowsWithRetries(scanner.get(), &row_count));
   ASSERT_EQ(0, row_count);
-  delete scanner_ptr;
 }
 
+TEST_F(ScanTokenTest, TestMasterRequestsWithMetadata) {
+  const char* kTableName = "scan-token-requests";
+  // Create schema
+  KuduSchema schema;
+  {
+    KuduSchemaBuilder builder;
+    
builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+    builder.AddColumn("a")->NotNull()->Type(KuduColumnSchema::INT64);
+    ASSERT_OK(builder.Build(&schema));
+  }
+
+  // Create table
+  shared_ptr<KuduTable> table;
+  {
+    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(kTableName)
+                            .schema(&schema)
+                            .set_range_partition_columns({})
+                            .num_replicas(1)
+                            .Create());
+    ASSERT_OK(client_->OpenTable(kTableName, &table));
+  }
+
+  vector<KuduScanToken*> tokens;
+  KuduScanTokenBuilder builder(table.get());
+  ASSERT_OK(builder.IncludeTableMetadata(true));
+  ASSERT_OK(builder.IncludeTabletMetadata(true));
+  ASSERT_OK(builder.Build(&tokens));
+  ASSERT_EQ(1, tokens.size());
+  unique_ptr<KuduScanToken> token(tokens[0]);
+
+  shared_ptr<KuduClient> new_client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+  // List the tables to prevent counting initialization RPCs.
+  vector<string> tables;
+  ASSERT_OK(new_client->ListTables(&tables));
+
+  const auto init_schema_requests = NumGetTableSchemaRequests();
+  const auto init_location_requests = NumGetTableLocationsRequests();
+
+  // Validate that hydrating a token doesn't result in a GetTableSchema
+  // or GetTableLocations request.
+  unique_ptr<KuduScanner> scanner;
+  ASSERT_OK(IntoUniqueScanner(new_client.get(), *token, &scanner));
+  ASSERT_EQ(init_schema_requests, NumGetTableSchemaRequests());
+  ASSERT_EQ(init_location_requests, NumGetTableLocationsRequests());
+
+  // Validate that hydrating a token doesn't result in a GetTableSchema
+  // or GetTableLocations request.
+  ASSERT_OK(scanner->Open());
+  KuduScanBatch batch;
+  ASSERT_OK(scanner->NextBatch(&batch));
+  ASSERT_EQ(init_schema_requests, NumGetTableSchemaRequests());
+  ASSERT_EQ(init_location_requests, NumGetTableLocationsRequests());
+}
+
+TEST_F(ScanTokenTest, TestMasterRequestsNoMetadata) {
+  const char* kTableName = "scan-token-requests-no-meta";
+  // Create schema
+  KuduSchema schema;
+  {
+    KuduSchemaBuilder builder;
+    
builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+    builder.AddColumn("a")->NotNull()->Type(KuduColumnSchema::INT64);
+    ASSERT_OK(builder.Build(&schema));
+  }
+
+  // Create table
+  shared_ptr<KuduTable> table;
+  {
+    unique_ptr<client::KuduTableCreator> 
table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(kTableName)
+                  .schema(&schema)
+                  .set_range_partition_columns({})
+                  .num_replicas(1)
+                  .Create());
+    ASSERT_OK(client_->OpenTable(kTableName, &table));
+  }
+
+  shared_ptr<KuduClient> new_client;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &new_client));
+  // List the tables to prevent counting initialization RPCs.
+  vector<string> tables;
+  ASSERT_OK(new_client->ListTables(&tables));
+
+  vector<KuduScanToken*> tokens;
+  KuduScanTokenBuilder builder(table.get());
+  ASSERT_OK(builder.IncludeTableMetadata(false));
+  ASSERT_OK(builder.IncludeTabletMetadata(false));
+  ASSERT_OK(builder.Build(&tokens));
+  ASSERT_EQ(1, tokens.size());
+  unique_ptr<KuduScanToken> token(tokens[0]);
+
+  const auto init_schema_requests = NumGetTableSchemaRequests();
+  const auto init_location_requests = NumGetTableLocationsRequests();
+
+  // Validate that hydrating a token into a scanner results in a single 
GetTableSchema request.
+  unique_ptr<KuduScanner> scanner;
+  ASSERT_OK(IntoUniqueScanner(new_client.get(), *token, &scanner));
+  ASSERT_EQ(init_schema_requests + 1, NumGetTableSchemaRequests());
+
+  // Validate that opening the scanner results in a GetTableLocations request.
+  ASSERT_OK(scanner->Open());
+  KuduScanBatch batch;
+  ASSERT_OK(scanner->NextBatch(&batch));
+  ASSERT_EQ(init_location_requests + 1, NumGetTableLocationsRequests());
+}
 
 } // namespace client
 } // namespace kudu

Reply via email to