This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new a50aa5bfd [java] client update master locations cache
a50aa5bfd is described below
commit a50aa5bfdcebd9e842a357e47caa307e5db2bae0
Author: zhangyifan27 <[email protected]>
AuthorDate: Fri Jul 1 20:31:24 2022 +0800
[java] client update master locations cache
Recently a master in our cluster was down because of network issues and
somehow
the server didn't close the connections to some clients. Then these clients
keep trying to connect to the dead master but can't receive response until
rpc
timeout, even when this server is up the client still send rpc through the
old
channel and can't connect to the server and the new leader master. The only
solution is to restart clients.
This patch fixes the issue that java client can't invalidate stale
locations of
the leader master. Maybe in this case we also need a better way to trigger
connection shutdown for an inactive channel.
Change-Id: Ia2877518866ac4c2d1dda6427ce57d08df48a864
Reviewed-on: http://gerrit.cloudera.org:8080/18703
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
.../kudu/client/ConnectToClusterResponse.java | 7 ++++++
.../apache/kudu/client/TableLocationsCache.java | 8 +++++--
.../kudu/client/TestTableLocationsCache.java | 27 ++++++++++++++++++++++
src/kudu/master/master.proto | 2 +-
4 files changed, 41 insertions(+), 3 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
index 18421a664..9b965f718 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
@@ -34,6 +34,12 @@ class ConnectToClusterResponse {
private static final ByteString FAKE_TABLET_ID = ByteString.copyFromUtf8(
AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER);
+ /**
+ * If the client caches master locations, the entries should not live longer
+ * than this timeout. Defaults to one hour.
+ */
+ private static final int CACHE_TTL_MS = 60 * 60 * 1000;
+
/** The host and port of the master that is currently leader */
private final HostAndPort leaderHostAndPort;
/** The response from that master */
@@ -73,6 +79,7 @@ class ConnectToClusterResponse {
.addTsInfos(TSInfoPB.newBuilder()
.addRpcAddresses(ProtobufHelper.hostAndPortToPB(leaderHostAndPort))
.setPermanentUuid(ByteString.copyFromUtf8(fakeUuid)))
+ .setTtlMillis(CACHE_TTL_MS)
.build();
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
index a138dbd0a..788c1f997 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -62,7 +62,11 @@ class TableLocationsCache {
rwl.readLock().lock();
try {
Preconditions.checkState(entries.size() <= 1);
- return entries.get(AsyncKuduClient.EMPTY_ARRAY);
+ TableLocationsCache.Entry entry =
entries.get(AsyncKuduClient.EMPTY_ARRAY);
+ if (entry.isStale()) {
+ return null;
+ }
+ return entry;
} finally {
rwl.readLock().unlock();
}
@@ -105,7 +109,7 @@ class TableLocationsCache {
if (requestPartitionKey == null) {
// Master lookup.
Preconditions.checkArgument(tablets.size() == 1);
- Entry entry = Entry.tablet(tablets.get(0), TimeUnit.DAYS.toMillis(1));
+ Entry entry = Entry.tablet(tablets.get(0), deadline);
rwl.writeLock().lock();
try {
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
index 0f0ce1a12..606952094 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
@@ -98,4 +98,31 @@ public class TestTableLocationsCache {
cache.cacheTabletLocations(tablets, AsyncKuduClient.EMPTY_ARRAY, 1, 100);
assertNotNull(cache.get(AsyncKuduClient.EMPTY_ARRAY));
}
+
+ // Test for checking client will not get expired tablet locations.
+ @Test(timeout = 30000)
+ public void testTTLForTableLocationsCache() {
+ final int TTL_MS = 100;
+ byte[] partitionKey = "start_key".getBytes(StandardCharsets.UTF_8);
+ List<RemoteTablet> tablets = ImmutableList.of(
+ TestRemoteTablet.getTablet(0, 1, -1, partitionKey,
AsyncKuduClient.EMPTY_ARRAY));
+ cache.cacheTabletLocations(tablets, partitionKey, 1, TTL_MS);
+ assertNotNull(cache.get(partitionKey));
+ // Mock as if the time increased by 100ms.
+ Mockito.when(TableLocationsCache.ticker.read()).thenReturn(TTL_MS *
1000000L);
+ assertNull(cache.get(partitionKey));
+ }
+
+ // Test for checking client will not get expired master locations.
+ @Test(timeout = 30000)
+ public void testTTLForMasterLocationsCache() {
+ final int TTL_MS = 100;
+ List<RemoteTablet> masterTablets =
+ ImmutableList.of(TestRemoteTablet.getTablet(0, 1, -1));
+ cache.cacheTabletLocations(masterTablets, null, 1, TTL_MS);
+ assertNotNull(cache.get(null));
+ // Mock as if the time increased by 100ms.
+ Mockito.when(TableLocationsCache.ticker.read()).thenReturn(TTL_MS *
1000000L);
+ assertNull(cache.get(null));
+ }
}
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index afab96ef2..f3f65657a 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -692,7 +692,7 @@ message GetTableLocationsResponsePB {
// If the client caches table locations, the entries should not live longer
// than this timeout. Defaults to one hour.
- optional uint32 ttl_millis = 3 [default = 36000000];
+ optional uint32 ttl_millis = 3 [default = 3600000];
}
message AlterTableRequestPB {