KUDU-2343. java: properly reconnect to new leader master after failover

This fixes the "fake" location information returned in response to a
ConnectToMaster RPC to include a distinct "fake UUID" for each master.
Previously, we were using an empty string for the UUID of the masters.
This caused collisions in the ConnectionCache, which is keyed by server
UUIDs.

The fake UUID added by this patch matches the fake UUID already in use
by AsyncKuduClient.newMasterRpcProxy. This should allow us to share the
RPC connection between the ConnectToMaster RPCs and the subsequent
GetTableLocation RPCs, which is also a benefit for latency after a
failover or on a fresh client.

Additionally, this will help with various log messages that previously
would print an empty UUID string.

A prior version of this patch solved the problem by changing the key for
the ConnectionCache to be based on IP address, which has other benefits
in terms of future support for servers changing their DNS resolution at
runtime. However, since this patch is intended for backport into prior
releases, this simpler approach is taken for now. A TODO is added for
the longer-term idea.

An existing test which tested killing a master now runs in a second mode
which restarts the master. This reproduced the bug prior to the fix.
This patch also cleans up that test somewhat - it was doing some buggy
logic to attempt to kill more than one tablet server, but in fact just
called "killTabletServer" three times on the same one. Killing three
tablet servers never made sense, either, since the table in the test
only had three replicas. Neither did it make sense to start six tablet
servers for the test.

Change-Id: I36f96c6712800e398ed46887d97d4b09fd993b04
Reviewed-on: http://gerrit.cloudera.org:8080/9612
Reviewed-by: Alexey Serbin <aser...@cloudera.com>
Tested-by: Todd Lipcon <t...@apache.org>
(cherry picked from commit 487a21476ba3551b3a7ec98cf96f772a495f31fb)
Reviewed-on: http://gerrit.cloudera.org:8080/9615
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <granthe...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/29d86fc8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/29d86fc8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/29d86fc8

Branch: refs/heads/branch-1.7.x
Commit: 29d86fc83c56c5a8ff418d01e958e887f42f3597
Parents: 65fd3d1
Author: Todd Lipcon <t...@apache.org>
Authored: Tue Mar 13 11:43:47 2018 -0700
Committer: Grant Henke <granthe...@gmail.com>
Committed: Wed Mar 14 01:15:50 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  6 +-
 .../kudu/client/ConnectToClusterResponse.java   |  3 +-
 .../org/apache/kudu/client/ConnectionCache.java |  3 +
 .../org/apache/kudu/client/MiniKuduCluster.java |  1 +
 .../kudu/client/TestClientFailoverSupport.java  | 65 +++++++++++++++-----
 .../apache/kudu/client/TestConnectionCache.java |  6 +-
 6 files changed, 65 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/29d86fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0c7e800..4b93a34 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -406,7 +406,11 @@ public class AsyncKuduClient implements AutoCloseable {
       return null;
     }
     return newRpcProxy(
-        new ServerInfo("master-" + hostPort.toString(), hostPort, 
inetAddress), credentialsPolicy);
+        new ServerInfo(getFakeMasterUuid(hostPort), hostPort, inetAddress), 
credentialsPolicy);
+  }
+
+  static String getFakeMasterUuid(HostAndPort hostPort) {
+    return "master-" + hostPort.toString();
   }
 
   void reconnectToCluster(Callback<Void, Boolean> cb,

http://git-wip-us.apache.org/repos/asf/kudu/blob/29d86fc8/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
index 6cb687f..bfa1f2b 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
@@ -61,6 +61,7 @@ class ConnectToClusterResponse {
    * if it were a tablet.
    */
   public GetTableLocationsResponsePB getAsTableLocations() {
+    String fakeUuid = AsyncKuduClient.getFakeMasterUuid(leaderHostAndPort);
     return GetTableLocationsResponsePB.newBuilder()
         .addTabletLocations(TabletLocationsPB.newBuilder()
             .setPartition(PartitionPB.newBuilder()
@@ -70,7 +71,7 @@ class ConnectToClusterResponse {
             .addReplicas(ReplicaPB.newBuilder()
                 .setTsInfo(TSInfoPB.newBuilder()
                     
.addRpcAddresses(ProtobufHelper.hostAndPortToPB(leaderHostAndPort))
-                    .setPermanentUuid(ByteString.EMPTY)) // required field, 
but unused for master
+                    .setPermanentUuid(ByteString.copyFromUtf8(fakeUuid)))
                 .setRole(connectResponse.getRole()))).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/29d86fc8/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 53e8e59..db37425 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -68,6 +68,9 @@ class ConnectionCache {
    * Container mapping server UUID into the established connection from the 
client to the server.
    * It may be up to two connections per server: one established with 
secondary credentials
    * (e.g. authn token), another with primary ones (e.g. Kerberos credentials).
+   *
+   * TODO(todd) it would make more sense to key this by IP address rather than 
by UUID in
+   * case a server actually changes address and re-registers to the cluster.
    */
   @GuardedBy("uuid2connection")
   private final HashMultimap<String, Connection> uuid2connection = 
HashMultimap.create();

http://git-wip-us.apache.org/repos/asf/kudu/blob/29d86fc8/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index e7a315f..b4402f4 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -292,6 +292,7 @@ public class MiniKuduCluster implements AutoCloseable {
     if (!d.isRunning) {
       return;
     }
+    LOG.info("Killing tserver {}", hp);
     sendRequestToCluster(ControlShellRequestPB.newBuilder()
         .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
         .build());

http://git-wip-us.apache.org/repos/asf/kudu/blob/29d86fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
index 2eeb0cc..6f8f51f 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestClientFailoverSupport.java
@@ -20,8 +20,10 @@ import static 
org.apache.kudu.util.AssertHelpers.assertEventuallyTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
+import java.io.IOException;
 import java.util.List;
 import org.apache.kudu.util.AssertHelpers.BooleanExpression;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -29,12 +31,23 @@ import com.google.common.net.HostAndPort;
 
 public class TestClientFailoverSupport extends BaseKuduTest {
 
+  enum MasterFailureType {
+    RESTART,
+    KILL
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    final int NUM_TABLET_SERVERS = 6;
+    final int NUM_TABLET_SERVERS = 3;
     BaseKuduTest.doSetup(3, NUM_TABLET_SERVERS);
   }
 
+  @After
+  public void restartKilledMaster() throws IOException {
+    miniCluster.restartDeadMasters();
+    miniCluster.restartDeadTservers();
+  }
+
   private void waitUntilRowCount(final KuduTable table, final int rowCount, 
long timeoutMs)
       throws Exception {
     assertEventuallyTrue(String.format("Read count should be %s", rowCount),
@@ -48,25 +61,35 @@ public class TestClientFailoverSupport extends BaseKuduTest 
{
         }, timeoutMs);
   }
 
+  @Test(timeout = 100000)
+  public void testRestartLeaderMaster() throws Exception {
+    doTestMasterFailover(MasterFailureType.RESTART);
+  }
+
+  @Test(timeout = 100000)
+  public void testKillLeaderMaster() throws Exception {
+    doTestMasterFailover(MasterFailureType.KILL);
+  }
+
   /**
    * Tests that the Java client will appropriately failover when a new master 
leader is elected.
-   * We force a metadata update by killing the active tablet servers.
-   * Then we kill the master leader.
-   * We write some more rows.
+   *
+   * We inject some failure on the master, based on 'failureType'. Then we 
force a tablet
+   * re-election by killing the leader replica. The client then needs to 
reconnect to the masters
+   * to find the new location information.
+   *
    * If we can successfully read back the rows written, that shows the client 
handled the failover
    * correctly.
    */
-  @Test(timeout = 100000)
-  public void testMultipleFailover() throws Exception {
-    final String TABLE_NAME = TestClientFailoverSupport.class.getName();
-
+  private void doTestMasterFailover(MasterFailureType failureType) throws 
Exception {
+    final String TABLE_NAME = TestClientFailoverSupport.class.getName()
+        + "-" + failureType;
     createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
 
     KuduTable table = openTable(TABLE_NAME);
     KuduSession session = syncClient.newSession();
 
     final int TOTAL_ROWS_TO_INSERT = 10;
-    final int TSERVER_LEADERS_TO_KILL = 3;
 
     for (int i = 0; i < TOTAL_ROWS_TO_INSERT; i++) {
       session.apply(createBasicSchemaInsert(table, i));
@@ -74,14 +97,26 @@ public class TestClientFailoverSupport extends BaseKuduTest 
{
 
     waitUntilRowCount(table, TOTAL_ROWS_TO_INSERT, DEFAULT_SLEEP);
 
-    for (int i = 0; i < TSERVER_LEADERS_TO_KILL; i++) {
-      List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
-      assertEquals(1, tablets.size());
-      HostAndPort hp = findLeaderTabletServerHostPort(tablets.get(0));
-      miniCluster.killTabletServerOnHostPort(hp);
+    // Kill or restart the leader master.
+    switch (failureType) {
+    case KILL:
+      killMasterLeader();
+      break;
+    case RESTART:
+      restartLeaderMaster();
+      break;
     }
-    killMasterLeader();
 
+    // Kill the tablet server leader. This will force us to go back to the
+    // master to find the new location. At that point, the client will
+    // notice that the old leader master is no longer current and fail over
+    // to the new one.
+    List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+    assertEquals(1, tablets.size());
+    HostAndPort hp = findLeaderTabletServerHostPort(tablets.get(0));
+    miniCluster.killTabletServerOnHostPort(hp);
+
+    // Insert some more rows.
     for (int i = TOTAL_ROWS_TO_INSERT; i < 2*TOTAL_ROWS_TO_INSERT; i++) {
       session.apply(createBasicSchemaInsert(table, i));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/29d86fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 5b5d4cf..6759d59 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -62,8 +62,10 @@ public class TestConnectionCache {
         pingConnection(h);
       }
 
-      // 1 tserver and 3 masters and 3 connections from the newRpcProxy() in 
the loop above.
-      assertEquals(1 + 3 + 3, client.getConnectionListCopy().size());
+      // 3 masters and 3 connections from the newRpcProxy() in the loop above.
+      // No tservers have been connected to by the client since we haven't 
accessed
+      // any data.
+      assertEquals(3 + 3, client.getConnectionListCopy().size());
       assertFalse(allConnectionsTerminated(client));
 
       final RpcProxy proxy = client.newRpcProxy(serverInfos.get(0));

Reply via email to