[
https://issues.apache.org/jira/browse/FLINK-24369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433625#comment-17433625
]
Yao Zhang commented on FLINK-24369:
-----------------------------------
Hi [~turtlebin],
I am not quite sure that what has been changed on Flink 1.13.0. But I suggest
we might need a shutdown hook that ensure each operator is closed before JVM
terminates. Does anyone else have better solutions?
> Resource Leak may happen if flink cluster runs abnormally and repeatedly retry
> ------------------------------------------------------------------------------
>
> Key: FLINK-24369
> URL: https://issues.apache.org/jira/browse/FLINK-24369
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.0
> Environment: flink version:1.13.0
> redisson version:3.16.1
> Reporter: turtlebin
> Priority: Critical
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> I encountered a problem in the process of integrating Flink and Redisson.
> When the task encounters abnormalities and keeps retries, it will cause the
> number of Redis Clients to increase volatility (sometimes the number
> increases, sometimes the number decreases, but the overall trend is growth).
> Even if I shutdown the Redisson Instance by overwriting the close function ,
> the number of Redis-Clients cannot be prevented from continuing to grow, and
> eventually the number of Clients will reach the upper limit and an error will
> be thrown. Moreover, this situation only occurs in the Flink cluster
> operation mode with version 1.13.0 or above, and the number of Redis-Clients
> will remain stable in the local mode or in the cluster mode with version
> under1.12. The test code is below. I wonder if you can provide specific
> reasons and solutions for this situation, thank you.
> {code:java}
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import java.util.Properties;
> import java.util.Random;
> public class ExceptionTest {
> public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.enableCheckpointing(1000 * 60);
> DataStream<String> mock = createDataStream(env);
> mock.keyBy(x -> new Random().nextInt(20))
> .process(new ExceptionTestFunction())
> .uid("batch-query-key-process")
> .filter(x->x!=null)
> .print();
> env.execute("Exception-Test");
> }
> private static DataStream<String>
> createDataStream(StreamExecutionEnvironment env) {
> String topic = "test_topic_xhb03";
> Properties test = new Properties();
> test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
> test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
> FlinkKafkaConsumer<String> consumer = new
> FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), test);
> consumer.setStartFromLatest();
> DataStream<String> source = env.addSource(consumer);
> return source;
> }
> }
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.redisson.Redisson;
> import org.redisson.api.RedissonRxClient;
> import org.redisson.config.Config;
> @Slf4j
> public class ExceptionTestFunction extends KeyedProcessFunction<Integer,
> String, String> {
> private RedissonRxClient redisson;
> @Override
> public void close() {
> this.redisson.shutdown();
> log.info(String.format("Shut down redisson instance in close method,
> RedissonRxClient shutdown is %s", redisson.isShutdown()));
> }
>
> @Override
> public void open(Configuration parameters) {
> String prefix = "redis://";
> Config config = new Config();
> config.useSingleServer()
> .setClientName("xhb-redisson-main")
> .setTimeout(5000)
> .setConnectTimeout(10000)
> .setConnectionPoolSize(4)
> .setConnectionMinimumIdleSize(2)
> .setIdleConnectionTimeout(10000)
> .setAddress("127.0.0.1:6379")
> .setDatabase(0)
> .setPassword(null);
> this.redisson = Redisson.create(config).rxJava();
> }
> @Override
> public void processElement(String value, Context ctx, Collector<String>
> out) throws Exception {
> throw new NullPointerException("Null Pointer in ProcessElement");
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)