This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6be0317cf35f24380b754d4f397d3d30403d72fb Author: Lari Hotari <[email protected]> AuthorDate: Thu Apr 6 13:00:49 2023 +0300 [fix][proxy] Fix connection read timeout handling in Pulsar Proxy (#20014) (cherry picked from commit dd054080490f01e9ba9694756b335e3e10a637d5) --- .../pulsar/proxy/server/DirectProxyHandler.java | 6 ++- .../pulsar/proxy/server/ProxyConnection.java | 3 ++ .../proxy/server/ProxyReadTimeoutHandler.java | 46 ---------------------- .../proxy/server/ServiceChannelInitializer.java | 3 +- 4 files changed, 10 insertions(+), 48 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index ee5baaa8b85..1a902d38ba7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -45,6 +45,7 @@ import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.util.Arrays; @@ -206,7 +207,7 @@ public class DirectProxyHandler { int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs(); if (brokerProxyReadTimeoutMs > 0) { ch.pipeline().addLast("readTimeoutHandler", - new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); + new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); @@ -364,6 +365,9 @@ public class DirectProxyHandler { if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) { if (!isTlsOutboundChannel && !DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) { + if (ctx.pipeline().get("readTimeoutHandler") != null) { + ctx.pipeline().remove("readTimeoutHandler"); + } ProxyConnection.spliceNIC2NIC((EpollSocketChannel) ctx.channel(), (EpollSocketChannel) inboundChannel, ProxyConnection.SPLICE_BYTES) .addListener(future -> { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 97e5079f8a8..94e5a1345d7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -255,6 +255,9 @@ public class ProxyConnection extends PulsarHandler { if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) { if (!directProxyHandler.isTlsOutboundChannel && !isTlsInboundChannel) { + if (ctx.pipeline().get("readTimeoutHandler") != null) { + ctx.pipeline().remove("readTimeoutHandler"); + } spliceNIC2NIC((EpollSocketChannel) ctx.channel(), (EpollSocketChannel) directProxyHandler.outboundChannel, SPLICE_BYTES) .addListener(future -> { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyReadTimeoutHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyReadTimeoutHandler.java deleted file mode 100644 index f0cf2a2a41c..00000000000 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyReadTimeoutHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.proxy.server; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.handler.timeout.ReadTimeoutHandler; -import java.lang.reflect.Field; -import java.util.concurrent.TimeUnit; - -public class ProxyReadTimeoutHandler extends ReadTimeoutHandler { - - private final Field readingField; - - public ProxyReadTimeoutHandler(long timeout, TimeUnit unit) { - super(timeout, unit); - try { - this.readingField = IdleStateHandler.class.getDeclaredField("reading"); - this.readingField.setAccessible(true); - } catch (NoSuchFieldException e) { - throw new IllegalArgumentException("Exception caused while get 'reading' field", e); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - this.readingField.setBoolean(this, true); - super.channelReadComplete(ctx); - } -} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index dced772e30e..fc7c78d6a24 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -25,6 +25,7 @@ import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; +import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder; @@ -104,7 +105,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> } if (brokerProxyReadTimeoutMs > 0) { ch.pipeline().addLast("readTimeoutHandler", - new ProxyReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); + new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); } if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
