java: further cleanup of ConnectToCluster

This changes the individual RPCs which go to masters to directly return
the ConnectToMasterResponsePB. Then, these responses are aggregated in
the ConnectToCluster class, and once a leader is found, a new
ConnectToClusterResponse object is created which represents the result
of the aggregated multi-master operation.

The code to translate from the ConnectToMasterResponsePB is also moved
into the ConnectToClusterResponse class where it's a little more obvious
what it's doing, and called from AsyncKuduClient.

This also addresses a bunch of references in comments to the old RPC
names which got missed in the IDE-driven rename.

Change-Id: Ia51175ee1b4108341959d90545e2d6733d5e80d5
Reviewed-on: http://gerrit.cloudera.org:8080/6053
Reviewed-by: Dan Burkert <[email protected]>
Reviewed-by: Jean-Daniel Cryans <[email protected]>
Tested-by: Todd Lipcon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3102e383
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3102e383
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3102e383

Branch: refs/heads/master
Commit: 3102e3833bcf32a290b63e8d6cae917b62194ac6
Parents: 66e5756
Author: Todd Lipcon <[email protected]>
Authored: Thu Feb 16 23:19:21 2017 -0800
Committer: Todd Lipcon <[email protected]>
Committed: Wed Feb 22 04:51:37 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  13 ++-
 .../apache/kudu/client/ConnectToCluster.java    | 117 ++++++++-----------
 .../kudu/client/ConnectToClusterResponse.java   |  69 ++++++-----
 .../kudu/client/ConnectToMasterRequest.java     |  51 ++++----
 .../kudu/client/TestConnectToCluster.java       |  57 ++++-----
 5 files changed, 149 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 4c0d41b..cbb4ad5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1060,8 +1060,17 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return An initialized Deferred object to hold the response.
    */
   Deferred<Master.GetTableLocationsResponsePB> 
getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
-    return ConnectToCluster.run(masterAddresses, parentRpc,
-        connectionCache, defaultAdminOperationTimeoutMs);
+    return ConnectToCluster.run(masterAddresses, parentRpc, connectionCache,
+        defaultAdminOperationTimeoutMs)
+        .addCallback(
+            new Callback<Master.GetTableLocationsResponsePB, 
ConnectToClusterResponse>() {
+              @Override
+              public Master.GetTableLocationsResponsePB 
call(ConnectToClusterResponse resp) {
+                // Once we've connected, we translate the located master into 
a TableLocations
+                // since the rest of our locations caching code expects this 
type.
+                return resp.getAsTableLocations();
+              }
+            });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index a3987d5..018b531 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -28,17 +28,14 @@ import com.google.common.base.Functions;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
-import com.google.protobuf.ByteString;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.Common;
 import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
+import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role;
+import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
 import org.apache.kudu.rpc.RpcHeader.ErrorStatusPB.RpcErrorCodePB;
 import org.apache.kudu.util.NetUtil;
 
@@ -52,15 +49,13 @@ final class ConnectToCluster {
   private static final Logger LOG = 
LoggerFactory.getLogger(ConnectToCluster.class);
 
   private final List<HostAndPort> masterAddrs;
-  private final Deferred<Master.GetTableLocationsResponsePB> responseD;
+  private final Deferred<ConnectToClusterResponse> responseD;
   private final int numMasters;
 
   // Used to avoid calling 'responseD' twice.
   private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
 
-  // Number of responses we've receives: used to tell whether or not we've 
received
-  // errors/replies from all of the masters, or if there are any
-  // GetMasterRegistrationRequests still pending.
+  /** Number of responses we've received so far */
   private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
 
   // Exceptions received so far: kept for debugging purposes.
@@ -71,8 +66,6 @@ final class ConnectToCluster {
    * Creates an object that holds the state needed to retrieve master table's 
location.
    * @param masterAddrs Addresses of all master replicas that we want to 
retrieve the
    *                    registration from.
-   * @param responseD Deferred object that will hold the 
GetTableLocationsResponsePB object for
-   *                  the master table.
    */
   ConnectToCluster(List<HostAndPort> masterAddrs) {
     this.masterAddrs = masterAddrs;
@@ -80,11 +73,12 @@ final class ConnectToCluster {
     this.numMasters = masterAddrs.size();
   }
 
-  public Deferred<GetTableLocationsResponsePB> getDeferred() {
+  @VisibleForTesting
+  public Deferred<ConnectToClusterResponse> getDeferred() {
     return responseD;
   }
 
-  private static Deferred<ConnectToClusterResponse> getMasterRegistration(
+  private static Deferred<ConnectToMasterResponsePB> connectToMaster(
       final TabletClient masterClient, KuduRpc<?> parentRpc,
       long defaultTimeoutMs) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to 
query the masters,
@@ -96,15 +90,15 @@ final class ConnectToCluster {
     } else {
       rpc.setTimeoutMillis(defaultTimeoutMs);
     }
-    Deferred<ConnectToClusterResponse> d = rpc.getDeferred();
+    Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
     rpc.attempt++;
     masterClient.sendRpc(rpc);
 
-    // If we are connecting to an older version of Kudu, we'll get a 
NO_SUCH_METHOD
+    // If we are connecting to an older version of Kudu, we'll get an invalid 
request
     // error. In that case, we resend using the older version of the RPC.
-    d.addErrback(new Callback<Deferred<ConnectToClusterResponse>, Exception>() 
{
+    d.addErrback(new Callback<Deferred<ConnectToMasterResponsePB>, 
Exception>() {
       @Override
-      public Deferred<ConnectToClusterResponse> call(Exception result)
+      public Deferred<ConnectToMasterResponsePB> call(Exception result)
           throws Exception {
         if (result instanceof RpcRemoteException) {
           RpcRemoteException rre = (RpcRemoteException)result;
@@ -112,7 +106,7 @@ final class ConnectToCluster {
               rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) {
             AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() 
RPC to connect " +
                 "to server running Kudu < 1.3.");
-            Deferred<ConnectToClusterResponse> newAttempt = rpc.getDeferred();
+            Deferred<ConnectToMasterResponsePB> newAttempt = rpc.getDeferred();
             assert newAttempt != null;
             rpc.setUseOldMethod();
             masterClient.sendRpc(rpc);
@@ -127,17 +121,17 @@ final class ConnectToCluster {
   }
 
   /**
-   * Retrieve the master registration (see {@link ConnectToClusterResponse}
-   * from the leader master.
+   * Locate the leader master and retrieve the cluster information
+   * (see {@link ConnectToClusterResponse}.
    *
    * @param masterAddresses the addresses of masters to fetch from
    * @param parentRpc RPC that prompted a master lookup, can be null
    * @param connCache the client's connection cache, used for creating 
connections
    *                  to masters
    * @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no 
timeout
-   * @return a Deferred object for the master replica's current registration
+   * @return a Deferred object for the cluster connection status
    */
-  public static Deferred<GetTableLocationsResponsePB> run(
+  public static Deferred<ConnectToClusterResponse> run(
       List<HostAndPort> masterAddresses,
       KuduRpc<?> parentRpc,
       ConnectionCache connCache,
@@ -148,7 +142,7 @@ final class ConnectToCluster {
     // waits until it gets a good response before firing the returned
     // deferred.
     for (HostAndPort hostAndPort : masterAddresses) {
-      Deferred<ConnectToClusterResponse> d;
+      Deferred<ConnectToMasterResponsePB> d;
       TabletClient client = connCache.newMasterClient(hostAndPort);
       if (client == null) {
         String message = "Couldn't resolve this master's address " + 
hostAndPort.toString();
@@ -156,7 +150,7 @@ final class ConnectToCluster {
         Status statusIOE = Status.IOError(message);
         d = Deferred.fromError(new NonRecoverableException(statusIOE));
       } else {
-        d = getMasterRegistration(client, parentRpc, defaultTimeoutMs);
+        d = connectToMaster(client, parentRpc, defaultTimeoutMs);
       }
       d.addCallbacks(connector.callbackForNode(hostAndPort),
           connector.errbackForNode(hostAndPort));
@@ -165,32 +159,32 @@ final class ConnectToCluster {
   }
 
   /**
-   * Creates a callback for a GetMasterRegistrationRequest that was sent to 
'hostAndPort'.
-   * @see GetMasterRegistrationCB
+   * Creates a callback for a ConnectToMaster RPC that was sent to 
'hostAndPort'.
+   * @see ConnectToMasterCB
    * @param hostAndPort Host and part for the RPC we're attaching this to. 
Host and port must
    *                    be valid.
    * @return The callback object that can be added to the RPC request.
    */
   @VisibleForTesting
-  Callback<Void, ConnectToClusterResponse> callbackForNode(HostAndPort 
hostAndPort) {
-    return new GetMasterRegistrationCB(hostAndPort);
+  Callback<Void, ConnectToMasterResponsePB> callbackForNode(HostAndPort 
hostAndPort) {
+    return new ConnectToMasterCB(hostAndPort);
   }
 
   /**
-   * Creates an errback for a GetMasterRegistrationRequest that was sent to 
'hostAndPort'.
-   * @see GetMasterRegistrationErrCB
+   * Creates an errback for a ConnectToMaster that was sent to 'hostAndPort'.
+   * @see ConnectToMasterErrCB
    * @param hostAndPort Host and port for the RPC we're attaching this to. 
Used for debugging
    *                    purposes.
    * @return The errback object that can be added to the RPC request.
    */
   @VisibleForTesting
   Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) {
-    return new GetMasterRegistrationErrCB(hostAndPort);
+    return new ConnectToMasterErrCB(hostAndPort);
   }
 
   /**
    * Checks if we've already received a response or an exception from every 
master that
-   * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has 
been found
+   * we've sent a ConnectToMaster to. If so -- and no leader has been found
    * (that is, 'responseD' was never called) -- pass a {@link 
NoLeaderFoundException}
    * to responseD.
    */
@@ -250,7 +244,7 @@ final class ConnectToCluster {
   }
 
   /**
-   * Callback for each GetMasterRegistrationRequest sent in 
getMasterTableLocations() above.
+   * Callback for each ConnectToCluster RPC sent in connectToMaster() above.
    * If a request (paired to a specific master) returns a reply that indicates 
it's a leader,
    * the callback in 'responseD' is invoked with an initialized 
GetTableLocationResponsePB
    * object containing the leader's RPC address.
@@ -258,65 +252,50 @@ final class ConnectToCluster {
    * the number of masters, pass {@link NoLeaderFoundException} into
    * 'responseD' if no one else had called 'responseD' before; otherwise, do 
nothing.
    */
-  final class GetMasterRegistrationCB implements Callback<Void, 
ConnectToClusterResponse> {
+  final class ConnectToMasterCB implements Callback<Void, 
ConnectToMasterResponsePB> {
     private final HostAndPort hostAndPort;
 
-    public GetMasterRegistrationCB(HostAndPort hostAndPort) {
+    public ConnectToMasterCB(HostAndPort hostAndPort) {
       this.hostAndPort = hostAndPort;
     }
 
     @Override
-    public Void call(ConnectToClusterResponse r) throws Exception {
-      Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
-          Master.TabletLocationsPB.ReplicaPB.newBuilder();
-
-      Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder()
-          .addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort))
-          .setPermanentUuid(ByteString.EMPTY); // required field, but unused 
for master
-
-      replicaBuilder.setTsInfo(tsInfoBuilder);
-      if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
-        replicaBuilder.setRole(r.getRole());
-        Master.TabletLocationsPB.Builder locationBuilder = 
Master.TabletLocationsPB.newBuilder();
-        locationBuilder.setPartition(
-            
Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY)
-                                           
.setPartitionKeyEnd(ByteString.EMPTY));
-        locationBuilder.setTabletId(
-            
ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER));
-        locationBuilder.addReplicas(replicaBuilder);
-        // No one else has called this before us.
-        if (responseDCalled.compareAndSet(false, true)) {
-          responseD.callback(
-              
Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations(
-                  locationBuilder.build()).build()
-          );
-        } else {
-          LOG.debug("Callback already invoked, discarding response(" + 
r.toString() + ") from " +
-              hostAndPort.toString());
-        }
-      } else {
+    public Void call(ConnectToMasterResponsePB r) throws Exception {
+      if (!r.getRole().equals(Role.LEADER)) {
         incrementCountAndCheckExhausted();
+        return null;
       }
+      // We found a leader!
+      if (!responseDCalled.compareAndSet(false,  true)) {
+        // Someone else already found a leader. This is somewhat unexpected
+        // because this means two nodes think they're the leader, but it's
+        // not impossible. We'll just ignore it.
+        LOG.debug("Callback already invoked, discarding response(" + 
r.toString() + ") from " +
+            hostAndPort.toString());
+        return null;
+      }
+
+      responseD.callback(new ConnectToClusterResponse(hostAndPort, r));
       return null;
     }
 
     @Override
     public String toString() {
-      return "get master registration for " + hostAndPort.toString();
+      return "ConnectToMasterCB for " + hostAndPort.toString();
     }
   }
 
   /**
-   * Errback for each GetMasterRegistrationRequest sent in 
getMasterTableLocations() above.
+   * Errback for each ConnectToMaster RPC sent in connectToMaster() above.
    * Stores each exception in 'exceptionsReceived'. Increments 
'countResponseReceived': if
    * the count is equal to the number of masters and no one else had called 
'responseD' before,
    * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do
    * nothing.
    */
-  final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {
+  final class ConnectToMasterErrCB implements Callback<Void, Exception> {
     private final HostAndPort hostAndPort;
 
-    public GetMasterRegistrationErrCB(HostAndPort hostAndPort) {
+    public ConnectToMasterErrCB(HostAndPort hostAndPort) {
       this.hostAndPort = hostAndPort;
     }
 
@@ -330,7 +309,7 @@ final class ConnectToCluster {
 
     @Override
     public String toString() {
-      return "get master registration errback for " + hostAndPort.toString();
+      return "ConnectToMasterErrCB for " + hostAndPort.toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
index 59cb306..50b483e 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
@@ -17,44 +17,53 @@
 
 package org.apache.kudu.client;
 
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
+import com.google.common.net.HostAndPort;
+import com.google.protobuf.ByteString;
+
+import org.apache.kudu.Common.PartitionPB;
+import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
+import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
+import org.apache.kudu.master.Master.TSInfoPB;
+import org.apache.kudu.master.Master.TabletLocationsPB;
+import org.apache.kudu.master.Master.TabletLocationsPB.ReplicaPB;
 
 /**
- * Response for {@link ConnectToMasterRequest}.
+ * The aggregated response after connecting to a cluster. This stores the
+ * identity of the leader master as well as the response from that master.
  */
[email protected]
-public class ConnectToClusterResponse extends KuduRpcResponse {
+class ConnectToClusterResponse {
+  private static final ByteString FAKE_TABLET_ID = ByteString.copyFromUtf8(
+      AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER);
 
-  private final Metadata.RaftPeerPB.Role role;
+  /** The host and port of the master that is currently leader */
+  private final HostAndPort leaderHostAndPort;
+  /** The response from that master */
+  private final ConnectToMasterResponsePB connectResponse;
 
-  /**
-   * Describes a response to a {@link ConnectToMasterRequest}, built from
-   * {@link Master.GetMasterRegistrationResponsePB}.
-   *
-   * @param role Master's role in the config.
-   */
-  public ConnectToClusterResponse(long elapsedMillis, String tsUUID,
-                                  Metadata.RaftPeerPB.Role role) {
-    super(elapsedMillis, tsUUID);
-    this.role = role;
+
+  public ConnectToClusterResponse(HostAndPort hostAndPort,
+      ConnectToMasterResponsePB connectResponse) {
+    super();
+    this.leaderHostAndPort = hostAndPort;
+    this.connectResponse = connectResponse;
   }
 
   /**
-   * Returns this master's role in the config.
-   *
-   * @see Metadata.RaftPeerPB.Role
-   * @return Node's role in the cluster, or FOLLOWER if the node is not 
initialized.
+   * Return the location of the located leader master as if this had been a 
normal
+   * tablet lookup. This is necessary so that we can cache the master location 
as
+   * if it were a tablet.
    */
-  public Metadata.RaftPeerPB.Role getRole() {
-    return role;
-  }
-
-  @Override
-  public String toString() {
-    return "GetMasterRegistrationResponse{" +
-        "role=" + role +
-        '}';
+  public GetTableLocationsResponsePB getAsTableLocations() {
+    return GetTableLocationsResponsePB.newBuilder()
+        .addTabletLocations(TabletLocationsPB.newBuilder()
+            .setPartition(PartitionPB.newBuilder()
+                .setPartitionKeyStart(ByteString.EMPTY)
+                .setPartitionKeyEnd(ByteString.EMPTY))
+            .setTabletId(FAKE_TABLET_ID)
+            .addReplicas(ReplicaPB.newBuilder()
+                .setTsInfo(TSInfoPB.newBuilder()
+                    
.addRpcAddresses(ProtobufHelper.hostAndPortToPB(leaderHostAndPort))
+                    .setPermanentUuid(ByteString.EMPTY)) // required field, 
but unused for master
+                .setRole(connectResponse.getRole()))).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
index 6ff06e3..00e4537 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kudu.client;
 
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB;
 import static org.apache.kudu.master.Master.GetMasterRegistrationRequestPB;
 import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB;
-import static org.apache.kudu.master.Master.MasterErrorPB;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +34,7 @@ import org.apache.kudu.util.Pair;
  * Package-private RPC that can only go to master.
  */
 @InterfaceAudience.Private
-public class ConnectToMasterRequest extends KuduRpc<ConnectToClusterResponse> {
+public class ConnectToMasterRequest extends KuduRpc<ConnectToMasterResponsePB> 
{
   /**
    * Kudu 1.2 and earlier use GetMasterRegistration to connect to the master.
    */
@@ -81,49 +79,42 @@ public class ConnectToMasterRequest extends 
KuduRpc<ConnectToClusterResponse> {
   }
 
   @Override
-  Pair<ConnectToClusterResponse, Object> deserialize(CallResponse callResponse,
-                                                     String tsUUID) throws 
KuduException {
+  Pair<ConnectToMasterResponsePB, Object> deserialize(CallResponse 
callResponse,
+                                                       String tsUUID) throws 
KuduException {
     if (method == CONNECT_TO_MASTER) {
       return deserializeNewRpc(callResponse, tsUUID);
     }
     return deserializeOldRpc(callResponse, tsUUID);
   }
 
-  private Pair<ConnectToClusterResponse, Object> deserializeNewRpc(
+  private Pair<ConnectToMasterResponsePB, Object> deserializeNewRpc(
       CallResponse callResponse, String tsUUID) {
 
     final ConnectToMasterResponsePB.Builder respBuilder =
         ConnectToMasterResponsePB.newBuilder();
     readProtobuf(callResponse.getPBMessage(), respBuilder);
-    RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
-    if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
-        MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
-      role = respBuilder.getRole();
-    }
-    ConnectToClusterResponse response = new ConnectToClusterResponse(
-        deadlineTracker.getElapsedMillis(),
-        tsUUID,
-        role);
-    return new Pair<ConnectToClusterResponse, Object>(
-        response, respBuilder.hasError() ? respBuilder.getError() : null);
+    return new Pair<ConnectToMasterResponsePB, Object>(
+        respBuilder.build(),
+        respBuilder.hasError() ? respBuilder.getError() : null);
   }
 
-  private Pair<ConnectToClusterResponse, Object> 
deserializeOldRpc(CallResponse callResponse,
+  private Pair<ConnectToMasterResponsePB, Object> 
deserializeOldRpc(CallResponse callResponse,
       String tsUUID) throws KuduException {
-    final GetMasterRegistrationResponsePB.Builder respBuilder =
+    final GetMasterRegistrationResponsePB.Builder resp =
         GetMasterRegistrationResponsePB.newBuilder();
-    readProtobuf(callResponse.getPBMessage(), respBuilder);
-    RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
-    if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
-        MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
-      role = respBuilder.getRole();
+    readProtobuf(callResponse.getPBMessage(), resp);
+
+    // Translate to the new RPC result type.
+    ConnectToMasterResponsePB.Builder b = 
ConnectToMasterResponsePB.newBuilder();
+    if (resp.hasRole()) {
+      b.setRole(resp.getRole());
+    }
+    if (resp.hasError()) {
+      b.setError(resp.getError());
     }
-    ConnectToClusterResponse response = new ConnectToClusterResponse(
-        deadlineTracker.getElapsedMillis(),
-        tsUUID,
-        role);
-    return new Pair<ConnectToClusterResponse, Object>(
-        response, respBuilder.hasError() ? respBuilder.getError() : null);
+    return new Pair<ConnectToMasterResponsePB, Object>(
+        b.build(),
+        b.hasError() ? b.getError() : null);
   }
 
   public void setUseOldMethod() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
index 0facf89..30727ed 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -30,6 +30,7 @@ import com.stumbleupon.async.Callback;
 import org.junit.Test;
 
 import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
 
 public class TestConnectToCluster {
 
@@ -86,45 +87,45 @@ public class TestConnectToCluster {
 
     // Normal case.
     runTest(
-        makeGMRR(LEADER),
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
+        makeCTMR(LEADER),
+        makeCTMR(FOLLOWER),
+        makeCTMR(FOLLOWER),
         successResponse);
 
     // Permutation works too.
     runTest(
-        makeGMRR(FOLLOWER),
-        makeGMRR(LEADER),
-        makeGMRR(FOLLOWER),
+        makeCTMR(FOLLOWER),
+        makeCTMR(LEADER),
+        makeCTMR(FOLLOWER),
         successResponse);
 
     // Multiple leaders, that's fine since it might be a TOCTOU situation, or 
one master
     // is confused. Raft handles this if the client then tries to do something 
that requires a
     // replication on the master-side.
     runTest(
-        makeGMRR(LEADER),
-        makeGMRR(LEADER),
-        makeGMRR(FOLLOWER),
+        makeCTMR(LEADER),
+        makeCTMR(LEADER),
+        makeCTMR(FOLLOWER),
         successResponse);
 
     // Mixed bag, still works because there's a leader.
     runTest(
         reusableNRE,
-        makeGMRR(FOLLOWER),
-        makeGMRR(LEADER),
+        makeCTMR(FOLLOWER),
+        makeCTMR(LEADER),
         successResponse);
 
     // All unreachable except one leader, still good.
     runTest(
         reusableNRE,
         reusableNRE,
-        makeGMRR(LEADER),
+        makeCTMR(LEADER),
         successResponse);
 
     // Permutation of the previous.
     runTest(
         reusableNRE,
-        makeGMRR(LEADER),
+        makeCTMR(LEADER),
         reusableNRE,
         successResponse);
 
@@ -132,22 +133,22 @@ public class TestConnectToCluster {
 
     // Just followers means we retry.
     runTest(
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
+        makeCTMR(FOLLOWER),
+        makeCTMR(FOLLOWER),
+        makeCTMR(FOLLOWER),
         retryResponse);
 
     // One NRE but we have responsive masters, retry.
     runTest(
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
+        makeCTMR(FOLLOWER),
+        makeCTMR(FOLLOWER),
         reusableNRE,
         retryResponse);
 
     // One good master but no leader, retry.
     runTest(
         reusableNRE,
-        makeGMRR(FOLLOWER),
+        makeCTMR(FOLLOWER),
         reusableNRE,
         retryResponse);
 
@@ -155,7 +156,7 @@ public class TestConnectToCluster {
     runTest(
         reusableRE,
         reusableNRE,
-        makeGMRR(FOLLOWER),
+        makeCTMR(FOLLOWER),
         retryResponse);
 
     // All recoverable means retry.
@@ -192,9 +193,9 @@ public class TestConnectToCluster {
 
     ConnectToCluster grrm = new ConnectToCluster(MASTERS);
 
-    Callback<Void, ConnectToClusterResponse> cb0 = 
grrm.callbackForNode(MASTERS.get(0));
-    Callback<Void, ConnectToClusterResponse> cb1 = 
grrm.callbackForNode(MASTERS.get(1));
-    Callback<Void, ConnectToClusterResponse> cb2 = 
grrm.callbackForNode(MASTERS.get(2));
+    Callback<Void, ConnectToMasterResponsePB> cb0 = 
grrm.callbackForNode(MASTERS.get(0));
+    Callback<Void, ConnectToMasterResponsePB> cb1 = 
grrm.callbackForNode(MASTERS.get(1));
+    Callback<Void, ConnectToMasterResponsePB> cb2 = 
grrm.callbackForNode(MASTERS.get(2));
 
     Callback<Void, Exception> eb0 = grrm.errbackForNode(MASTERS.get(0));
     Callback<Void, Exception> eb1 = grrm.errbackForNode(MASTERS.get(1));
@@ -218,17 +219,19 @@ public class TestConnectToCluster {
 
   // Helper method that determines if the callback or errback should be called.
   private static void callTheRightCallback(
-      Callback<Void, ConnectToClusterResponse> cb,
+      Callback<Void, ConnectToMasterResponsePB> cb,
       Callback<Void, Exception> eb,
       Object response) throws Exception {
     if (response instanceof Exception) {
       eb.call((Exception) response);
     } else {
-      cb.call((ConnectToClusterResponse) response);
+      cb.call((ConnectToMasterResponsePB) response);
     }
   }
 
-  private static ConnectToClusterResponse makeGMRR(Metadata.RaftPeerPB.Role 
role) {
-    return new ConnectToClusterResponse(0, "", role);
+  private static ConnectToMasterResponsePB makeCTMR(Metadata.RaftPeerPB.Role 
role) {
+    return ConnectToMasterResponsePB.newBuilder()
+        .setRole(role)
+        .build();
   }
 }

Reply via email to