This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c8d4086  ISSUE #2482: Added TCP_USER_TIMEOUT to Epoll channel config
c8d4086 is described below

commit c8d4086697f85c9093f4da2c907a13e17c198914
Author: Raúl Gracia <[email protected]>
AuthorDate: Wed Aug 25 14:05:34 2021 +0200

    ISSUE #2482: Added TCP_USER_TIMEOUT to Epoll channel config
    
    ### Motivation
    
    Added `TCP_USER_TIMEOUT` in Epoll channel config to limit the time a 
connection is left sending keepalives to a non-responding Bookie.
    
    ### Changes
    
    The original issue reported that in scenarios where Bookies may go down 
unexpectedly and change their IP (e.g., Kubernetes), the Bookkeeper client may 
be left for some time attempting to connect with the old IP of the restarted 
Bookie (see #2482 for details). To prevent this problem from happening (in 
Epoll channels), we introduce the following changes:
    - Epoll channels are now configured with `TCP_USER_TIMEOUT`. This parameter 
rules over the underlying TCP keepalive configuration (see 
https://datatracker.ietf.org/doc/html/rfc5482), which may be defaulted to retry 
for too long depending on the environment (e.g., 10-15 minutes in our 
experience).
    - To prevent adding more configuration parameters, the existing 
`clientConnectTimeoutMillis` value in `ClientConfiguration` is the one used to 
set `TCP_USER_TIMEOUT` due to its similarity.
    
    ### Validation
    
    We have reproduced the original testing environment in which this problem 
appears consistently:
    - Cluster with 4 Bookies and 3 Kubernetes nodes, in addition to 
https://pravega.io which uses the Bookkeeper client.
    - Deployed an application to do IO to Pravega (and therefore, to 
Bookkeeper).
    - Periodically shut down a Kubernetes node, so Bookkeeper pods on it are 
restarted as well.
    
    Considering this test procedure, without the proposed PR we consistently 
observe Bookkeeper clients getting stuck trying to contact with old IPs from 
Bookies. With this change, we confirmed via logs that the configuration change 
takes place and we have not been able to reproduce the original problem so far 
after performing multiple node reboots.
    
    Master Issue: #2482
    
    
    Reviewers: Flavio Junqueira <[email protected]>, Enrico Olivelli 
<[email protected]>
    
    This closes #2761 from RaulGracia/issue-2482-close-idle-bookie-connection, 
closes #2482
---
 .../bookkeeper/conf/ClientConfiguration.java       | 24 ++++++++++++
 .../bookkeeper/proto/PerChannelBookieClient.java   |  8 ++++
 .../conf/NoSystemPropertiesConfigurationTest.java  |  6 ++-
 .../conf/SystemPropertiesConfigurationTest.java    |  2 +
 .../proto/TestPerChannelBookieClient.java          | 45 ++++++++++++++++++++++
 5 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 80c2ad4..9354451 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENS
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -96,6 +97,7 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     protected static final String CLIENT_WRITEBUFFER_LOW_WATER_MARK = 
"clientWriteBufferLowWaterMark";
     protected static final String CLIENT_WRITEBUFFER_HIGH_WATER_MARK = 
"clientWriteBufferHighWaterMark";
     protected static final String CLIENT_CONNECT_TIMEOUT_MILLIS = 
"clientConnectTimeoutMillis";
+    protected static final String CLIENT_TCP_USER_TIMEOUT_MILLIS = 
"clientTcpUserTimeoutMillis";
     protected static final String NUM_CHANNELS_PER_BOOKIE = 
"numChannelsPerBookie";
     protected static final String USE_V2_WIRE_PROTOCOL = "useV2WireProtocol";
     protected static final String NETTY_USE_POOLED_BUFFERS = 
"nettyUsePooledBuffers";
@@ -531,6 +533,28 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
     }
 
     /**
+     * Get client netty TCP user timeout in millis (only for Epoll channels).
+     *
+     * @return client netty Epoll user tcp timeout in millis.
+     * @throws NoSuchElementException if the property is not set.
+     */
+    public int getTcpUserTimeoutMillis() {
+        return getInt(CLIENT_TCP_USER_TIMEOUT_MILLIS);
+    }
+
+    /**
+     * Set client netty TCP user timeout in millis (only for Epoll channels).
+     *
+     * @param tcpUserTimeoutMillis
+     *          client netty TCP user timeout in millis.
+     * @return client configuration.
+     */
+    public ClientConfiguration setTcpUserTimeoutMillis(int 
tcpUserTimeoutMillis) {
+        setProperty(CLIENT_TCP_USER_TIMEOUT_MILLIS, tcpUserTimeoutMillis);
+        return this;
+    }
+
+    /**
      * Get num channels per bookie.
      *
      * @return num channels per bookie.
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index f248a40..7346d19 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -42,6 +42,7 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollChannelOption;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.local.LocalAddress;
@@ -70,6 +71,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
@@ -540,6 +542,12 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         bootstrap.group(eventLoopGroup);
         if (eventLoopGroup instanceof EpollEventLoopGroup) {
             bootstrap.channel(EpollSocketChannel.class);
+            try {
+                // For Epoll channels, configure the TCP user timeout.
+                bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, 
conf.getTcpUserTimeoutMillis());
+            } catch (NoSuchElementException e) {
+                // Property not set, so keeping default value.
+            }
         } else if (eventLoopGroup instanceof DefaultEventLoopGroup) {
             bootstrap.channel(LocalChannel.class);
         } else {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
index 6f03697..5fe86aa 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/NoSystemPropertiesConfigurationTest.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.conf;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.NoSuchElementException;
 import org.junit.Test;
 
 /**
@@ -32,11 +33,14 @@ public class NoSystemPropertiesConfigurationTest {
         // this property is read when AbstractConfiguration class is loaded.
         // this test will work as expected only using a new JVM (or 
classloader) for the test
         System.setProperty(ClientConfiguration.THROTTLE, "10");
+        System.setProperty(ClientConfiguration.CLIENT_TCP_USER_TIMEOUT_MILLIS, 
"20000");
     }
 
-    @Test
+    @Test(expected = NoSuchElementException.class)
     public void testUseSystemProperty() {
         ClientConfiguration clientConfiguration = new ClientConfiguration();
         assertEquals(5000, clientConfiguration.getThrottleValue());
+        // This should throw NoSuchElementException if the property has not 
been set.
+        clientConfiguration.getTcpUserTimeoutMillis();
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
index db02515..8ce8952 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/SystemPropertiesConfigurationTest.java
@@ -33,11 +33,13 @@ public class SystemPropertiesConfigurationTest {
         // this test will work as expected only using a new JVM (or 
classloader) for the test
         
System.setProperty(AbstractConfiguration.READ_SYSTEM_PROPERTIES_PROPERTY, 
"true");
         System.setProperty(ClientConfiguration.THROTTLE, "10");
+        System.setProperty(ClientConfiguration.CLIENT_TCP_USER_TIMEOUT_MILLIS, 
"20000");
     }
 
     @Test
     public void testUseSystemProperty() {
         ClientConfiguration clientConfiguration = new ClientConfiguration();
         assertEquals(10, clientConfiguration.getThrottleValue());
+        assertEquals(20000, clientConfiguration.getTcpUserTimeoutMillis());
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 3abeb81..870e6ce 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -28,6 +29,9 @@ import com.google.protobuf.ExtensionRegistry;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollChannelOption;
+import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 
 import java.io.IOException;
@@ -48,9 +52,12 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.junit.Assume;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,4 +302,42 @@ public class TestPerChannelBookieClient extends 
BookKeeperClusterTestCase {
         eventLoopGroup.shutdownGracefully();
         executor.shutdown();
     }
+
+    /**
+     * Test that TCP user timeout is correctly set in EpollEventLoopGroup.
+     */
+    @Test
+    public void testEpollChannelTcpUserTimeout() throws Exception {
+        // Epoll is needed for this test to work.
+        Assume.assumeTrue(Epoll.isAvailable());
+
+        EventLoopGroup eventLoopGroup = new EpollEventLoopGroup();
+        OrderedExecutor executor = getOrderedSafeExecutor();
+        ClientConfiguration conf = new ClientConfiguration();
+        int tcpUserTimeout = 1234;
+        BookieId addr = getBookie(0);
+
+        // Pass to the PerChannelBookieClient object the client configuration 
with TCP user timeout.
+        PerChannelBookieClient channel = new PerChannelBookieClient(conf, 
executor, eventLoopGroup,
+                addr, Mockito.mock(StatsLogger.class), authProvider, 
extRegistry,
+                Mockito.mock(PerChannelBookieClientPool.class), 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+        // Verify that the configured value has not been set in the channel if 
does not exist in config.
+        assertEquals(channel.connect().channel().config()
+                .getOption(EpollChannelOption.TCP_USER_TIMEOUT).intValue(), 0);
+        channel.close();
+
+        // Create a new channel with new TCP user timeout set.
+        conf.setTcpUserTimeoutMillis(tcpUserTimeout);
+        channel = new PerChannelBookieClient(conf, executor, eventLoopGroup,
+                addr, Mockito.mock(StatsLogger.class), authProvider, 
extRegistry,
+                Mockito.mock(PerChannelBookieClientPool.class), 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+
+        // Verify that the configured value has been set.
+        assertEquals(channel.connect().channel().config()
+                        
.getOption(EpollChannelOption.TCP_USER_TIMEOUT).intValue(), tcpUserTimeout);
+        channel.close();
+        eventLoopGroup.shutdownGracefully();
+        executor.shutdown();
+    }
 }

Reply via email to