java: further cleanup of ConnectToCluster This changes the individual RPCs which go to masters to directly return the ConnectToMasterResponsePB. Then, these responses are aggregated in the ConnectToCluster class, and once a leader is found, a new ConnectToClusterResponse object is created which represents the result of the aggregated multi-master operation.
The code to translate from the ConnectToMasterResponsePB is also moved into the ConnectToClusterResponse class where it's a little more obvious what it's doing, and called from AsyncKuduClient. This also addresses a bunch of references in comments to the old RPC names which got missed in the IDE-driven rename. Change-Id: Ia51175ee1b4108341959d90545e2d6733d5e80d5 Reviewed-on: http://gerrit.cloudera.org:8080/6053 Reviewed-by: Dan Burkert <[email protected]> Reviewed-by: Jean-Daniel Cryans <[email protected]> Tested-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3102e383 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3102e383 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3102e383 Branch: refs/heads/master Commit: 3102e3833bcf32a290b63e8d6cae917b62194ac6 Parents: 66e5756 Author: Todd Lipcon <[email protected]> Authored: Thu Feb 16 23:19:21 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Wed Feb 22 04:51:37 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 13 ++- .../apache/kudu/client/ConnectToCluster.java | 117 ++++++++----------- .../kudu/client/ConnectToClusterResponse.java | 69 ++++++----- .../kudu/client/ConnectToMasterRequest.java | 51 ++++---- .../kudu/client/TestConnectToCluster.java | 57 ++++----- 5 files changed, 149 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 4c0d41b..cbb4ad5 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -1060,8 +1060,17 @@ public class AsyncKuduClient implements AutoCloseable { * @return An initialized Deferred object to hold the response. */ Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) { - return ConnectToCluster.run(masterAddresses, parentRpc, - connectionCache, defaultAdminOperationTimeoutMs); + return ConnectToCluster.run(masterAddresses, parentRpc, connectionCache, + defaultAdminOperationTimeoutMs) + .addCallback( + new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() { + @Override + public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) { + // Once we've connected, we translate the located master into a TableLocations + // since the rest of our locations caching code expects this type. + return resp.getAsTableLocations(); + } + }); } /** http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java index a3987d5..018b531 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java @@ -28,17 +28,14 @@ import com.google.common.base.Functions; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.net.HostAndPort; -import com.google.protobuf.ByteString; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kudu.Common; import org.apache.kudu.annotations.InterfaceAudience; -import org.apache.kudu.consensus.Metadata; -import org.apache.kudu.master.Master; -import org.apache.kudu.master.Master.GetTableLocationsResponsePB; +import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role; +import org.apache.kudu.master.Master.ConnectToMasterResponsePB; import org.apache.kudu.rpc.RpcHeader.ErrorStatusPB.RpcErrorCodePB; import org.apache.kudu.util.NetUtil; @@ -52,15 +49,13 @@ final class ConnectToCluster { private static final Logger LOG = LoggerFactory.getLogger(ConnectToCluster.class); private final List<HostAndPort> masterAddrs; - private final Deferred<Master.GetTableLocationsResponsePB> responseD; + private final Deferred<ConnectToClusterResponse> responseD; private final int numMasters; // Used to avoid calling 'responseD' twice. private final AtomicBoolean responseDCalled = new AtomicBoolean(false); - // Number of responses we've receives: used to tell whether or not we've received - // errors/replies from all of the masters, or if there are any - // GetMasterRegistrationRequests still pending. + /** Number of responses we've received so far */ private final AtomicInteger countResponsesReceived = new AtomicInteger(0); // Exceptions received so far: kept for debugging purposes. @@ -71,8 +66,6 @@ final class ConnectToCluster { * Creates an object that holds the state needed to retrieve master table's location. * @param masterAddrs Addresses of all master replicas that we want to retrieve the * registration from. - * @param responseD Deferred object that will hold the GetTableLocationsResponsePB object for - * the master table. */ ConnectToCluster(List<HostAndPort> masterAddrs) { this.masterAddrs = masterAddrs; @@ -80,11 +73,12 @@ final class ConnectToCluster { this.numMasters = masterAddrs.size(); } - public Deferred<GetTableLocationsResponsePB> getDeferred() { + @VisibleForTesting + public Deferred<ConnectToClusterResponse> getDeferred() { return responseD; } - private static Deferred<ConnectToClusterResponse> getMasterRegistration( + private static Deferred<ConnectToMasterResponsePB> connectToMaster( final TabletClient masterClient, KuduRpc<?> parentRpc, long defaultTimeoutMs) { // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters, @@ -96,15 +90,15 @@ final class ConnectToCluster { } else { rpc.setTimeoutMillis(defaultTimeoutMs); } - Deferred<ConnectToClusterResponse> d = rpc.getDeferred(); + Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred(); rpc.attempt++; masterClient.sendRpc(rpc); - // If we are connecting to an older version of Kudu, we'll get a NO_SUCH_METHOD + // If we are connecting to an older version of Kudu, we'll get an invalid request // error. In that case, we resend using the older version of the RPC. - d.addErrback(new Callback<Deferred<ConnectToClusterResponse>, Exception>() { + d.addErrback(new Callback<Deferred<ConnectToMasterResponsePB>, Exception>() { @Override - public Deferred<ConnectToClusterResponse> call(Exception result) + public Deferred<ConnectToMasterResponsePB> call(Exception result) throws Exception { if (result instanceof RpcRemoteException) { RpcRemoteException rre = (RpcRemoteException)result; @@ -112,7 +106,7 @@ final class ConnectToCluster { rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) { AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() RPC to connect " + "to server running Kudu < 1.3."); - Deferred<ConnectToClusterResponse> newAttempt = rpc.getDeferred(); + Deferred<ConnectToMasterResponsePB> newAttempt = rpc.getDeferred(); assert newAttempt != null; rpc.setUseOldMethod(); masterClient.sendRpc(rpc); @@ -127,17 +121,17 @@ final class ConnectToCluster { } /** - * Retrieve the master registration (see {@link ConnectToClusterResponse} - * from the leader master. + * Locate the leader master and retrieve the cluster information + * (see {@link ConnectToClusterResponse}. * * @param masterAddresses the addresses of masters to fetch from * @param parentRpc RPC that prompted a master lookup, can be null * @param connCache the client's connection cache, used for creating connections * to masters * @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout - * @return a Deferred object for the master replica's current registration + * @return a Deferred object for the cluster connection status */ - public static Deferred<GetTableLocationsResponsePB> run( + public static Deferred<ConnectToClusterResponse> run( List<HostAndPort> masterAddresses, KuduRpc<?> parentRpc, ConnectionCache connCache, @@ -148,7 +142,7 @@ final class ConnectToCluster { // waits until it gets a good response before firing the returned // deferred. for (HostAndPort hostAndPort : masterAddresses) { - Deferred<ConnectToClusterResponse> d; + Deferred<ConnectToMasterResponsePB> d; TabletClient client = connCache.newMasterClient(hostAndPort); if (client == null) { String message = "Couldn't resolve this master's address " + hostAndPort.toString(); @@ -156,7 +150,7 @@ final class ConnectToCluster { Status statusIOE = Status.IOError(message); d = Deferred.fromError(new NonRecoverableException(statusIOE)); } else { - d = getMasterRegistration(client, parentRpc, defaultTimeoutMs); + d = connectToMaster(client, parentRpc, defaultTimeoutMs); } d.addCallbacks(connector.callbackForNode(hostAndPort), connector.errbackForNode(hostAndPort)); @@ -165,32 +159,32 @@ final class ConnectToCluster { } /** - * Creates a callback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'. - * @see GetMasterRegistrationCB + * Creates a callback for a ConnectToMaster RPC that was sent to 'hostAndPort'. + * @see ConnectToMasterCB * @param hostAndPort Host and part for the RPC we're attaching this to. Host and port must * be valid. * @return The callback object that can be added to the RPC request. */ @VisibleForTesting - Callback<Void, ConnectToClusterResponse> callbackForNode(HostAndPort hostAndPort) { - return new GetMasterRegistrationCB(hostAndPort); + Callback<Void, ConnectToMasterResponsePB> callbackForNode(HostAndPort hostAndPort) { + return new ConnectToMasterCB(hostAndPort); } /** - * Creates an errback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'. - * @see GetMasterRegistrationErrCB + * Creates an errback for a ConnectToMaster that was sent to 'hostAndPort'. + * @see ConnectToMasterErrCB * @param hostAndPort Host and port for the RPC we're attaching this to. Used for debugging * purposes. * @return The errback object that can be added to the RPC request. */ @VisibleForTesting Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) { - return new GetMasterRegistrationErrCB(hostAndPort); + return new ConnectToMasterErrCB(hostAndPort); } /** * Checks if we've already received a response or an exception from every master that - * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found + * we've sent a ConnectToMaster to. If so -- and no leader has been found * (that is, 'responseD' was never called) -- pass a {@link NoLeaderFoundException} * to responseD. */ @@ -250,7 +244,7 @@ final class ConnectToCluster { } /** - * Callback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above. + * Callback for each ConnectToCluster RPC sent in connectToMaster() above. * If a request (paired to a specific master) returns a reply that indicates it's a leader, * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB * object containing the leader's RPC address. @@ -258,65 +252,50 @@ final class ConnectToCluster { * the number of masters, pass {@link NoLeaderFoundException} into * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing. */ - final class GetMasterRegistrationCB implements Callback<Void, ConnectToClusterResponse> { + final class ConnectToMasterCB implements Callback<Void, ConnectToMasterResponsePB> { private final HostAndPort hostAndPort; - public GetMasterRegistrationCB(HostAndPort hostAndPort) { + public ConnectToMasterCB(HostAndPort hostAndPort) { this.hostAndPort = hostAndPort; } @Override - public Void call(ConnectToClusterResponse r) throws Exception { - Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder = - Master.TabletLocationsPB.ReplicaPB.newBuilder(); - - Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder() - .addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort)) - .setPermanentUuid(ByteString.EMPTY); // required field, but unused for master - - replicaBuilder.setTsInfo(tsInfoBuilder); - if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) { - replicaBuilder.setRole(r.getRole()); - Master.TabletLocationsPB.Builder locationBuilder = Master.TabletLocationsPB.newBuilder(); - locationBuilder.setPartition( - Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY) - .setPartitionKeyEnd(ByteString.EMPTY)); - locationBuilder.setTabletId( - ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER)); - locationBuilder.addReplicas(replicaBuilder); - // No one else has called this before us. - if (responseDCalled.compareAndSet(false, true)) { - responseD.callback( - Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations( - locationBuilder.build()).build() - ); - } else { - LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " + - hostAndPort.toString()); - } - } else { + public Void call(ConnectToMasterResponsePB r) throws Exception { + if (!r.getRole().equals(Role.LEADER)) { incrementCountAndCheckExhausted(); + return null; } + // We found a leader! + if (!responseDCalled.compareAndSet(false, true)) { + // Someone else already found a leader. This is somewhat unexpected + // because this means two nodes think they're the leader, but it's + // not impossible. We'll just ignore it. + LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " + + hostAndPort.toString()); + return null; + } + + responseD.callback(new ConnectToClusterResponse(hostAndPort, r)); return null; } @Override public String toString() { - return "get master registration for " + hostAndPort.toString(); + return "ConnectToMasterCB for " + hostAndPort.toString(); } } /** - * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above. + * Errback for each ConnectToMaster RPC sent in connectToMaster() above. * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if * the count is equal to the number of masters and no one else had called 'responseD' before, * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do * nothing. */ - final class GetMasterRegistrationErrCB implements Callback<Void, Exception> { + final class ConnectToMasterErrCB implements Callback<Void, Exception> { private final HostAndPort hostAndPort; - public GetMasterRegistrationErrCB(HostAndPort hostAndPort) { + public ConnectToMasterErrCB(HostAndPort hostAndPort) { this.hostAndPort = hostAndPort; } @@ -330,7 +309,7 @@ final class ConnectToCluster { @Override public String toString() { - return "get master registration errback for " + hostAndPort.toString(); + return "ConnectToMasterErrCB for " + hostAndPort.toString(); } } } http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java index 59cb306..50b483e 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java @@ -17,44 +17,53 @@ package org.apache.kudu.client; -import org.apache.kudu.annotations.InterfaceAudience; -import org.apache.kudu.consensus.Metadata; -import org.apache.kudu.master.Master; +import com.google.common.net.HostAndPort; +import com.google.protobuf.ByteString; + +import org.apache.kudu.Common.PartitionPB; +import org.apache.kudu.master.Master.ConnectToMasterResponsePB; +import org.apache.kudu.master.Master.GetTableLocationsResponsePB; +import org.apache.kudu.master.Master.TSInfoPB; +import org.apache.kudu.master.Master.TabletLocationsPB; +import org.apache.kudu.master.Master.TabletLocationsPB.ReplicaPB; /** - * Response for {@link ConnectToMasterRequest}. + * The aggregated response after connecting to a cluster. This stores the + * identity of the leader master as well as the response from that master. */ [email protected] -public class ConnectToClusterResponse extends KuduRpcResponse { +class ConnectToClusterResponse { + private static final ByteString FAKE_TABLET_ID = ByteString.copyFromUtf8( + AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER); - private final Metadata.RaftPeerPB.Role role; + /** The host and port of the master that is currently leader */ + private final HostAndPort leaderHostAndPort; + /** The response from that master */ + private final ConnectToMasterResponsePB connectResponse; - /** - * Describes a response to a {@link ConnectToMasterRequest}, built from - * {@link Master.GetMasterRegistrationResponsePB}. - * - * @param role Master's role in the config. - */ - public ConnectToClusterResponse(long elapsedMillis, String tsUUID, - Metadata.RaftPeerPB.Role role) { - super(elapsedMillis, tsUUID); - this.role = role; + + public ConnectToClusterResponse(HostAndPort hostAndPort, + ConnectToMasterResponsePB connectResponse) { + super(); + this.leaderHostAndPort = hostAndPort; + this.connectResponse = connectResponse; } /** - * Returns this master's role in the config. - * - * @see Metadata.RaftPeerPB.Role - * @return Node's role in the cluster, or FOLLOWER if the node is not initialized. + * Return the location of the located leader master as if this had been a normal + * tablet lookup. This is necessary so that we can cache the master location as + * if it were a tablet. */ - public Metadata.RaftPeerPB.Role getRole() { - return role; - } - - @Override - public String toString() { - return "GetMasterRegistrationResponse{" + - "role=" + role + - '}'; + public GetTableLocationsResponsePB getAsTableLocations() { + return GetTableLocationsResponsePB.newBuilder() + .addTabletLocations(TabletLocationsPB.newBuilder() + .setPartition(PartitionPB.newBuilder() + .setPartitionKeyStart(ByteString.EMPTY) + .setPartitionKeyEnd(ByteString.EMPTY)) + .setTabletId(FAKE_TABLET_ID) + .addReplicas(ReplicaPB.newBuilder() + .setTsInfo(TSInfoPB.newBuilder() + .addRpcAddresses(ProtobufHelper.hostAndPortToPB(leaderHostAndPort)) + .setPermanentUuid(ByteString.EMPTY)) // required field, but unused for master + .setRole(connectResponse.getRole()))).build(); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java index 6ff06e3..00e4537 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java @@ -17,10 +17,8 @@ package org.apache.kudu.client; -import static org.apache.kudu.consensus.Metadata.RaftPeerPB; import static org.apache.kudu.master.Master.GetMasterRegistrationRequestPB; import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB; -import static org.apache.kudu.master.Master.MasterErrorPB; import java.util.Collection; import java.util.Collections; @@ -36,7 +34,7 @@ import org.apache.kudu.util.Pair; * Package-private RPC that can only go to master. */ @InterfaceAudience.Private -public class ConnectToMasterRequest extends KuduRpc<ConnectToClusterResponse> { +public class ConnectToMasterRequest extends KuduRpc<ConnectToMasterResponsePB> { /** * Kudu 1.2 and earlier use GetMasterRegistration to connect to the master. */ @@ -81,49 +79,42 @@ public class ConnectToMasterRequest extends KuduRpc<ConnectToClusterResponse> { } @Override - Pair<ConnectToClusterResponse, Object> deserialize(CallResponse callResponse, - String tsUUID) throws KuduException { + Pair<ConnectToMasterResponsePB, Object> deserialize(CallResponse callResponse, + String tsUUID) throws KuduException { if (method == CONNECT_TO_MASTER) { return deserializeNewRpc(callResponse, tsUUID); } return deserializeOldRpc(callResponse, tsUUID); } - private Pair<ConnectToClusterResponse, Object> deserializeNewRpc( + private Pair<ConnectToMasterResponsePB, Object> deserializeNewRpc( CallResponse callResponse, String tsUUID) { final ConnectToMasterResponsePB.Builder respBuilder = ConnectToMasterResponsePB.newBuilder(); readProtobuf(callResponse.getPBMessage(), respBuilder); - RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER; - if (!respBuilder.hasError() || respBuilder.getError().getCode() != - MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) { - role = respBuilder.getRole(); - } - ConnectToClusterResponse response = new ConnectToClusterResponse( - deadlineTracker.getElapsedMillis(), - tsUUID, - role); - return new Pair<ConnectToClusterResponse, Object>( - response, respBuilder.hasError() ? respBuilder.getError() : null); + return new Pair<ConnectToMasterResponsePB, Object>( + respBuilder.build(), + respBuilder.hasError() ? respBuilder.getError() : null); } - private Pair<ConnectToClusterResponse, Object> deserializeOldRpc(CallResponse callResponse, + private Pair<ConnectToMasterResponsePB, Object> deserializeOldRpc(CallResponse callResponse, String tsUUID) throws KuduException { - final GetMasterRegistrationResponsePB.Builder respBuilder = + final GetMasterRegistrationResponsePB.Builder resp = GetMasterRegistrationResponsePB.newBuilder(); - readProtobuf(callResponse.getPBMessage(), respBuilder); - RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER; - if (!respBuilder.hasError() || respBuilder.getError().getCode() != - MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) { - role = respBuilder.getRole(); + readProtobuf(callResponse.getPBMessage(), resp); + + // Translate to the new RPC result type. + ConnectToMasterResponsePB.Builder b = ConnectToMasterResponsePB.newBuilder(); + if (resp.hasRole()) { + b.setRole(resp.getRole()); + } + if (resp.hasError()) { + b.setError(resp.getError()); } - ConnectToClusterResponse response = new ConnectToClusterResponse( - deadlineTracker.getElapsedMillis(), - tsUUID, - role); - return new Pair<ConnectToClusterResponse, Object>( - response, respBuilder.hasError() ? respBuilder.getError() : null); + return new Pair<ConnectToMasterResponsePB, Object>( + b.build(), + b.hasError() ? b.getError() : null); } public void setUseOldMethod() { http://git-wip-us.apache.org/repos/asf/kudu/blob/3102e383/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java index 0facf89..30727ed 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java @@ -30,6 +30,7 @@ import com.stumbleupon.async.Callback; import org.junit.Test; import org.apache.kudu.consensus.Metadata; +import org.apache.kudu.master.Master.ConnectToMasterResponsePB; public class TestConnectToCluster { @@ -86,45 +87,45 @@ public class TestConnectToCluster { // Normal case. runTest( - makeGMRR(LEADER), - makeGMRR(FOLLOWER), - makeGMRR(FOLLOWER), + makeCTMR(LEADER), + makeCTMR(FOLLOWER), + makeCTMR(FOLLOWER), successResponse); // Permutation works too. runTest( - makeGMRR(FOLLOWER), - makeGMRR(LEADER), - makeGMRR(FOLLOWER), + makeCTMR(FOLLOWER), + makeCTMR(LEADER), + makeCTMR(FOLLOWER), successResponse); // Multiple leaders, that's fine since it might be a TOCTOU situation, or one master // is confused. Raft handles this if the client then tries to do something that requires a // replication on the master-side. runTest( - makeGMRR(LEADER), - makeGMRR(LEADER), - makeGMRR(FOLLOWER), + makeCTMR(LEADER), + makeCTMR(LEADER), + makeCTMR(FOLLOWER), successResponse); // Mixed bag, still works because there's a leader. runTest( reusableNRE, - makeGMRR(FOLLOWER), - makeGMRR(LEADER), + makeCTMR(FOLLOWER), + makeCTMR(LEADER), successResponse); // All unreachable except one leader, still good. runTest( reusableNRE, reusableNRE, - makeGMRR(LEADER), + makeCTMR(LEADER), successResponse); // Permutation of the previous. runTest( reusableNRE, - makeGMRR(LEADER), + makeCTMR(LEADER), reusableNRE, successResponse); @@ -132,22 +133,22 @@ public class TestConnectToCluster { // Just followers means we retry. runTest( - makeGMRR(FOLLOWER), - makeGMRR(FOLLOWER), - makeGMRR(FOLLOWER), + makeCTMR(FOLLOWER), + makeCTMR(FOLLOWER), + makeCTMR(FOLLOWER), retryResponse); // One NRE but we have responsive masters, retry. runTest( - makeGMRR(FOLLOWER), - makeGMRR(FOLLOWER), + makeCTMR(FOLLOWER), + makeCTMR(FOLLOWER), reusableNRE, retryResponse); // One good master but no leader, retry. runTest( reusableNRE, - makeGMRR(FOLLOWER), + makeCTMR(FOLLOWER), reusableNRE, retryResponse); @@ -155,7 +156,7 @@ public class TestConnectToCluster { runTest( reusableRE, reusableNRE, - makeGMRR(FOLLOWER), + makeCTMR(FOLLOWER), retryResponse); // All recoverable means retry. @@ -192,9 +193,9 @@ public class TestConnectToCluster { ConnectToCluster grrm = new ConnectToCluster(MASTERS); - Callback<Void, ConnectToClusterResponse> cb0 = grrm.callbackForNode(MASTERS.get(0)); - Callback<Void, ConnectToClusterResponse> cb1 = grrm.callbackForNode(MASTERS.get(1)); - Callback<Void, ConnectToClusterResponse> cb2 = grrm.callbackForNode(MASTERS.get(2)); + Callback<Void, ConnectToMasterResponsePB> cb0 = grrm.callbackForNode(MASTERS.get(0)); + Callback<Void, ConnectToMasterResponsePB> cb1 = grrm.callbackForNode(MASTERS.get(1)); + Callback<Void, ConnectToMasterResponsePB> cb2 = grrm.callbackForNode(MASTERS.get(2)); Callback<Void, Exception> eb0 = grrm.errbackForNode(MASTERS.get(0)); Callback<Void, Exception> eb1 = grrm.errbackForNode(MASTERS.get(1)); @@ -218,17 +219,19 @@ public class TestConnectToCluster { // Helper method that determines if the callback or errback should be called. private static void callTheRightCallback( - Callback<Void, ConnectToClusterResponse> cb, + Callback<Void, ConnectToMasterResponsePB> cb, Callback<Void, Exception> eb, Object response) throws Exception { if (response instanceof Exception) { eb.call((Exception) response); } else { - cb.call((ConnectToClusterResponse) response); + cb.call((ConnectToMasterResponsePB) response); } } - private static ConnectToClusterResponse makeGMRR(Metadata.RaftPeerPB.Role role) { - return new ConnectToClusterResponse(0, "", role); + private static ConnectToMasterResponsePB makeCTMR(Metadata.RaftPeerPB.Role role) { + return ConnectToMasterResponsePB.newBuilder() + .setRole(role) + .build(); } }
