[
https://issues.apache.org/jira/browse/FLINK-24369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
turtlebin updated FLINK-24369:
------------------------------
Flags: Important
Description:
I encountered a problem in the process of integrating Flink and Redisson. When
the task encounters abnormalities and keeps retries, it will cause the number
of Redis Clients to increase volatility (sometimes the number increases,
sometimes the number decreases, but the overall trend is growth). Even if I
shutdown the Redisson Instance by overwriting the close function , the number
of Redis-Clients cannot be prevented from continuing to grow, and eventually
the number of Clients will reach the upper limit and an error will be thrown.
Moreover, this situation only occurs in the Flink cluster operation mode with
version 1.13.0 or above, and the number of Redis-Clients will remain stable in
the local mode or in the cluster mode with version under1.12. The test code is
below. I wonder if you can provide specific reasons and solutions for this
situation, thank you.
{code:java}
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.Random;
public class ExceptionTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(1000 * 60);
DataStream<String> mock = createDataStream(env);
mock.keyBy(x -> new Random().nextInt(20))
.process(new ExceptionTestFunction())
.uid("batch-query-key-process")
.filter(x->x!=null)
.print();
env.execute("Exception-Test");
}
private static DataStream<String>
createDataStream(StreamExecutionEnvironment env) {
String topic = "test_topic_xhb03";
Properties test = new Properties();
test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), test);
consumer.setStartFromLatest();
DataStream<String> source = env.addSource(consumer);
return source;
}
}
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.redisson.Redisson;
import org.redisson.api.RedissonRxClient;
import org.redisson.config.Config;
@Slf4j
public class ExceptionTestFunction extends KeyedProcessFunction<Integer,
String, String> {
private RedissonRxClient redisson;
@Override
public void close() {
this.redisson.shutdown();
log.info(String.format("Shut down redisson instance in close method,
RedissonRxClient shutdown is %s", redisson.isShutdown()));
}
@Override
public void open(Configuration parameters) {
String prefix = "redis://";
Config config = new Config();
config.useSingleServer()
.setClientName("xhb-redisson-main")
.setTimeout(5000)
.setConnectTimeout(10000)
.setConnectionPoolSize(4)
.setConnectionMinimumIdleSize(2)
.setIdleConnectionTimeout(10000)
.setAddress("127.0.0.1:6379")
.setDatabase(0)
.setPassword(null);
this.redisson = Redisson.create(config).rxJava();
}
@Override
public void processElement(String value, Context ctx, Collector<String>
out) throws Exception {
throw new NullPointerException("Null Pointer in ProcessElement");
}
}
{code}
was:I encountered a problem in the process of integrating Flink and Redisson.
When the task encounters abnormalities and keeps retries, it will cause the
number of Redis Clients to increase volatility (sometimes the number increases,
sometimes the number decreases, but the overall trend is growth). Even if I
shutdown the Redisson Instance by overwriting the close function , the number
of Redis-Clients cannot be prevented from continuing to grow, and eventually
the number of Clients will reach the upper limit and an error will be thrown.
Moreover, this situation only occurs in the Flink cluster operation mode with
version 1.13.0 or above, and the number of Redis-Clients will remain stable in
the local mode or in the cluster mode with version under1.12. The test code is
below. I wonder if you can provide specific reasons and solutions for this
situation, thank you.
Environment:
flink version:1.13.0
redisson version:3.16.1
Remaining Estimate: 24h
Original Estimate: 24h
> Resource Leak may happen if flink cluster runs abnormally and repeatedly retry
> ------------------------------------------------------------------------------
>
> Key: FLINK-24369
> URL: https://issues.apache.org/jira/browse/FLINK-24369
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.0
> Environment: flink version:1.13.0
> redisson version:3.16.1
> Reporter: turtlebin
> Priority: Critical
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> I encountered a problem in the process of integrating Flink and Redisson.
> When the task encounters abnormalities and keeps retries, it will cause the
> number of Redis Clients to increase volatility (sometimes the number
> increases, sometimes the number decreases, but the overall trend is growth).
> Even if I shutdown the Redisson Instance by overwriting the close function ,
> the number of Redis-Clients cannot be prevented from continuing to grow, and
> eventually the number of Clients will reach the upper limit and an error will
> be thrown. Moreover, this situation only occurs in the Flink cluster
> operation mode with version 1.13.0 or above, and the number of Redis-Clients
> will remain stable in the local mode or in the cluster mode with version
> under1.12. The test code is below. I wonder if you can provide specific
> reasons and solutions for this situation, thank you.
> {code:java}
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import java.util.Properties;
> import java.util.Random;
> public class ExceptionTest {
> public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.enableCheckpointing(1000 * 60);
> DataStream<String> mock = createDataStream(env);
> mock.keyBy(x -> new Random().nextInt(20))
> .process(new ExceptionTestFunction())
> .uid("batch-query-key-process")
> .filter(x->x!=null)
> .print();
> env.execute("Exception-Test");
> }
> private static DataStream<String>
> createDataStream(StreamExecutionEnvironment env) {
> String topic = "test_topic_xhb03";
> Properties test = new Properties();
> test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
> test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
> FlinkKafkaConsumer<String> consumer = new
> FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), test);
> consumer.setStartFromLatest();
> DataStream<String> source = env.addSource(consumer);
> return source;
> }
> }
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.redisson.Redisson;
> import org.redisson.api.RedissonRxClient;
> import org.redisson.config.Config;
> @Slf4j
> public class ExceptionTestFunction extends KeyedProcessFunction<Integer,
> String, String> {
> private RedissonRxClient redisson;
> @Override
> public void close() {
> this.redisson.shutdown();
> log.info(String.format("Shut down redisson instance in close method,
> RedissonRxClient shutdown is %s", redisson.isShutdown()));
> }
>
> @Override
> public void open(Configuration parameters) {
> String prefix = "redis://";
> Config config = new Config();
> config.useSingleServer()
> .setClientName("xhb-redisson-main")
> .setTimeout(5000)
> .setConnectTimeout(10000)
> .setConnectionPoolSize(4)
> .setConnectionMinimumIdleSize(2)
> .setIdleConnectionTimeout(10000)
> .setAddress("127.0.0.1:6379")
> .setDatabase(0)
> .setPassword(null);
> this.redisson = Redisson.create(config).rxJava();
> }
> @Override
> public void processElement(String value, Context ctx, Collector<String>
> out) throws Exception {
> throw new NullPointerException("Null Pointer in ProcessElement");
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)