This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c8d4086 ISSUE #2482: Added TCP_USER_TIMEOUT to Epoll channel config
c8d4086 is described below
commit c8d4086697f85c9093f4da2c907a13e17c198914
Author: Raúl Gracia <[email protected]>
AuthorDate: Wed Aug 25 14:05:34 2021 +0200
ISSUE #2482: Added TCP_USER_TIMEOUT to Epoll channel config
### Motivation
Added `TCP_USER_TIMEOUT` in Epoll channel config to limit the time a
connection is left sending keepalives to a non-responding Bookie.
### Changes
The original issue reported that in scenarios where Bookies may go down
unexpectedly and change their IP (e.g., Kubernetes), the Bookkeeper client may
be left for some time attempting to connect with the old IP of the restarted
Bookie (see #2482 for details). To prevent this problem from happening (in
Epoll channels), we introduce the following changes:
- Epoll channels are now configured with `TCP_USER_TIMEOUT`. This parameter
rules over the underlying TCP keepalive configuration (see
https://datatracker.ietf.org/doc/html/rfc5482), which may be defaulted to retry
for too long depending on the environment (e.g., 10-15 minutes in our
experience).
- To prevent adding more configuration parameters, the existing
`clientConnectTimeoutMillis` value in `ClientConfiguration` is the one used to
set `TCP_USER_TIMEOUT` due to its similarity.
### Validation
We have reproduced the original testing environment in which this problem
appears consistently:
- Cluster with 4 Bookies and 3 Kubernetes nodes, in addition to
https://pravega.io which uses the Bookkeeper client.
- Deployed an application to do IO to Pravega (and therefore, to
Bookkeeper).
- Periodically shut down a Kubernetes node, so Bookkeeper pods on it are
restarted as well.
Considering this test procedure, without the proposed PR we consistently
observe Bookkeeper clients getting stuck trying to contact with old IPs from
Bookies. With this change, we confirmed via logs that the configuration change
takes place and we have not been able to reproduce the original problem so far
after performing multiple node reboots.
Master Issue: #2482
Reviewers: Flavio Junqueira <[email protected]>, Enrico Olivelli
<[email protected]>
This closes #2761 from RaulGracia/issue-2482-close-idle-bookie-connection,
closes #2482
---
.../bookkeeper/conf/ClientConfiguration.java | 24 ++++++++++++
.../bookkeeper/proto/PerChannelBookieClient.java | 8 ++++
.../conf/NoSystemPropertiesConfigurationTest.java | 6 ++-
.../conf/SystemPropertiesConfigurationTest.java | 2 +
.../proto/TestPerChannelBookieClient.java | 45 ++++++++++++++++++++++
5 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 80c2ad4..9354451 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -22,6 +22,7 @@ import static
org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENS
import io.netty.buffer.ByteBuf;
+import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -96,6 +97,7 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
protected static final String CLIENT_WRITEBUFFER_LOW_WATER_MARK =
"clientWriteBufferLowWaterMark";
protected static final String CLIENT_WRITEBUFFER_HIGH_WATER_MARK =
"clientWriteBufferHighWaterMark";
protected static final String CLIENT_CONNECT_TIMEOUT_MILLIS =
"clientConnectTimeoutMillis";
+ protected static final String CLIENT_TCP_USER_TIMEOUT_MILLIS =
"clientTcpUserTimeoutMillis";
protected static final String NUM_CHANNELS_PER_BOOKIE =
"numChannelsPerBookie";
protected static final String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
protected static final String NETTY_USE_POOLED_BUFFERS =
"nettyUsePooledBuffers";
@@ -531,6 +533,28 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
}
/**
+ * Get client netty TCP user timeout in millis (only for Epoll channels).
+ *
+ * @return client netty Epoll user tcp timeout in millis.
+ * @throws NoSuchElementException if the property is not set.
+ */
+ public int getTcpUserTimeoutMillis() {
+ return getInt(CLIENT_TCP_USER_TIMEOUT_MILLIS);
+ }
+
+ /**
+ * Set client netty TCP user timeout in millis (only for Epoll channels).
+ *
+ * @param tcpUserTimeoutMillis
+ * client netty TCP user timeout in millis.
+ * @return client configuration.
+ */
+ public ClientConfiguration setTcpUserTimeoutMillis(int
tcpUserTimeoutMillis) {
+ setProperty(CLIENT_TCP_USER_TIMEOUT_MILLIS, tcpUserTimeoutMillis);
+ return this;
+ }
+
+ /**
* Get num channels per bookie.
*
* @return num channels per bookie.
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f248a40..7346d19 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -42,6 +42,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalAddress;
@@ -70,6 +71,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@@ -540,6 +542,12 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
bootstrap.group(eventLoopGroup);
if (eventLoopGroup instanceof EpollEventLoopGroup) {
bootstrap.channel(EpollSocketChannel.class);
+ try {
+ // For Epoll channels, configure the TCP user timeout.
+ bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT,
conf.getTcpUserTimeoutMillis());
+ } catch (NoSuchElementException e) {
+ // Property not set, so keeping default value.
+ }
} else if (eventLoopGroup instanceof DefaultEventLoopGroup) {
bootstrap.channel(LocalChannel.class);
} else {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
index 6f03697..5fe86aa 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.conf;
import static org.junit.Assert.assertEquals;
+import java.util.NoSuchElementException;
import org.junit.Test;
/**
@@ -32,11 +33,14 @@ public class NoSystemPropertiesConfigurationTest {
// this property is read when AbstractConfiguration class is loaded.
// this test will work as expected only using a new JVM (or
classloader) for the test
System.setProperty(ClientConfiguration.THROTTLE, "10");
+ System.setProperty(ClientConfiguration.CLIENT_TCP_USER_TIMEOUT_MILLIS,
"20000");
}
- @Test
+ @Test(expected = NoSuchElementException.class)
public void testUseSystemProperty() {
ClientConfiguration clientConfiguration = new ClientConfiguration();
assertEquals(5000, clientConfiguration.getThrottleValue());
+ // This should throw NoSuchElementException if the property has not
been set.
+ clientConfiguration.getTcpUserTimeoutMillis();
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
index db02515..8ce8952 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
@@ -33,11 +33,13 @@ public class SystemPropertiesConfigurationTest {
// this test will work as expected only using a new JVM (or
classloader) for the test
System.setProperty(AbstractConfiguration.READ_SYSTEM_PROPERTIES_PROPERTY,
"true");
System.setProperty(ClientConfiguration.THROTTLE, "10");
+ System.setProperty(ClientConfiguration.CLIENT_TCP_USER_TIMEOUT_MILLIS,
"20000");
}
@Test
public void testUseSystemProperty() {
ClientConfiguration clientConfiguration = new ClientConfiguration();
assertEquals(10, clientConfiguration.getThrottleValue());
+ assertEquals(20000, clientConfiguration.getTcpUserTimeoutMillis());
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 3abeb81..870e6ce 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.proto;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -28,6 +29,9 @@ import com.google.protobuf.ExtensionRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
@@ -48,9 +52,12 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.junit.Assume;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,4 +302,42 @@ public class TestPerChannelBookieClient extends
BookKeeperClusterTestCase {
eventLoopGroup.shutdownGracefully();
executor.shutdown();
}
+
+ /**
+ * Test that TCP user timeout is correctly set in EpollEventLoopGroup.
+ */
+ @Test
+ public void testEpollChannelTcpUserTimeout() throws Exception {
+ // Epoll is needed for this test to work.
+ Assume.assumeTrue(Epoll.isAvailable());
+
+ EventLoopGroup eventLoopGroup = new EpollEventLoopGroup();
+ OrderedExecutor executor = getOrderedSafeExecutor();
+ ClientConfiguration conf = new ClientConfiguration();
+ int tcpUserTimeout = 1234;
+ BookieId addr = getBookie(0);
+
+ // Pass to the PerChannelBookieClient object the client configuration
with TCP user timeout.
+ PerChannelBookieClient channel = new PerChannelBookieClient(conf,
executor, eventLoopGroup,
+ addr, Mockito.mock(StatsLogger.class), authProvider,
extRegistry,
+ Mockito.mock(PerChannelBookieClientPool.class),
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+ // Verify that the configured value has not been set in the channel if
does not exist in config.
+ assertEquals(channel.connect().channel().config()
+ .getOption(EpollChannelOption.TCP_USER_TIMEOUT).intValue(), 0);
+ channel.close();
+
+ // Create a new channel with new TCP user timeout set.
+ conf.setTcpUserTimeoutMillis(tcpUserTimeout);
+ channel = new PerChannelBookieClient(conf, executor, eventLoopGroup,
+ addr, Mockito.mock(StatsLogger.class), authProvider,
extRegistry,
+ Mockito.mock(PerChannelBookieClientPool.class),
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+ // Verify that the configured value has been set.
+ assertEquals(channel.connect().channel().config()
+
.getOption(EpollChannelOption.TCP_USER_TIMEOUT).intValue(), tcpUserTimeout);
+ channel.close();
+ eventLoopGroup.shutdownGracefully();
+ executor.shutdown();
+ }
}