[ https://issues.apache.org/jira/browse/BAHIR-306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578558#comment-17578558 ]
João Boto commented on BAHIR-306: --------------------------------- Those are old connectors from flink.. To use bahir connectors you have to use https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis > Please release a new version of flink-connector-redis_2.11 to fix the Jedis > bug > ------------------------------------------------------------------------------- > > Key: BAHIR-306 > URL: https://issues.apache.org/jira/browse/BAHIR-306 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors > Reporter: Yang Bodong > Assignee: João Boto > Priority: Blocker > Fix For: Not Applicable > > > It has been 5 years since the latest `flink-connector-redis_2.11` release > [https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis_2.11] > , we recently encountered the following error when upgrading Redis version: > > {code:java} > java.lang.ExceptionInInitializerError > at > com.ymm.realtime.function.DesignedMapFunction.flatMap(DesignedMapFunction.java:41) > at > com.ymm.realtime.function.DesignedMapFunction.flatMap(DesignedMapFunction.java:16) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > at > com.ymm.realtime.KafkaBooStrap.lambda$main$f53f5b13$1(KafkaBooStrap.java:52) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > Caused by: java.lang.NumberFormatException: For input string: "6379@13028" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.valueOf(Integer.java:766) > at > redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39) > at > redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14) > at > redis.clients.jedis.JedisClusterInfoCache.discoverClusterNodesAndSlots(JedisClusterInfoCache.java:50) > at > redis.clients.jedis.JedisClusterConnectionHandler.initializeSlotsCache(JedisClusterConnectionHandler.java:39) > at > redis.clients.jedis.JedisClusterConnectionHandler.<init>(JedisClusterConnectionHandler.java:28) > at > redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:21) > at redis.clients.jedis.BinaryJedisCluster.<init>(BinaryJedisCluster.java:46) > at redis.clients.jedis.JedisCluster.<init>(JedisCluster.java:50) > at com.ymm.realtime.util.RedisUtils.initJedisCluster(RedisUtils.java:36) > at com.ymm.realtime.util.RedisUtils.<clinit>(RedisUtils.java:25) {code} > > I have confirmed that the problem is because `flink-connector-redis_2.11` > depends on `Jedis` version `2.8.0`. But `Jedis` has [fixed this > problem|https://github.com/redis/jedis/issues/1958] in `2.9.0`. It is worth > mentioning that the latest code on GitHub shows that > `flink-connector-redis_2.11` has also been updated to the `Jedis` version > [https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/pom.xml#L37] > , but I have not Searching for the latest package on mvn, what did I miss? -- This message was sent by Atlassian Jira (v8.20.10#820010)