[ 
https://issues.apache.org/jira/browse/BAHIR-306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Bodong closed BAHIR-306.
-----------------------------

> Please release a new version of flink-connector-redis_2.11 to fix the Jedis 
> bug
> -------------------------------------------------------------------------------
>
>                 Key: BAHIR-306
>                 URL: https://issues.apache.org/jira/browse/BAHIR-306
>             Project: Bahir
>          Issue Type: Wish
>          Components: Flink Streaming Connectors
>            Reporter: Yang Bodong
>            Assignee: João Boto
>            Priority: Blocker
>             Fix For: Not Applicable
>
>
> It has been 5 years since the latest `flink-connector-redis_2.11` release 
> [https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis_2.11]
>  , we recently encountered the following error when upgrading Redis version:
>  
> {code:java}
> java.lang.ExceptionInInitializerError
>   at 
> com.ymm.realtime.function.DesignedMapFunction.flatMap(DesignedMapFunction.java:41)
>   at 
> com.ymm.realtime.function.DesignedMapFunction.flatMap(DesignedMapFunction.java:16)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>   at 
> com.ymm.realtime.KafkaBooStrap.lambda$main$f53f5b13$1(KafkaBooStrap.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.lang.NumberFormatException: For input string: "6379@13028"
>   at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>   at java.lang.Integer.parseInt(Integer.java:580)
>   at java.lang.Integer.valueOf(Integer.java:766)
>   at 
> redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39)
>   at 
> redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14)
>   at 
> redis.clients.jedis.JedisClusterInfoCache.discoverClusterNodesAndSlots(JedisClusterInfoCache.java:50)
>   at 
> redis.clients.jedis.JedisClusterConnectionHandler.initializeSlotsCache(JedisClusterConnectionHandler.java:39)
>   at 
> redis.clients.jedis.JedisClusterConnectionHandler.<init>(JedisClusterConnectionHandler.java:28)
>   at 
> redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:21)
>   at redis.clients.jedis.BinaryJedisCluster.<init>(BinaryJedisCluster.java:46)
>   at redis.clients.jedis.JedisCluster.<init>(JedisCluster.java:50)
>   at com.ymm.realtime.util.RedisUtils.initJedisCluster(RedisUtils.java:36)
>   at com.ymm.realtime.util.RedisUtils.<clinit>(RedisUtils.java:25) {code}
>  
> I have confirmed that the problem is because `flink-connector-redis_2.11` 
> depends on `Jedis` version `2.8.0`. But `Jedis` has [fixed this 
> problem|https://github.com/redis/jedis/issues/1958] in `2.9.0`. It is worth 
> mentioning that the latest code on GitHub shows that 
> `flink-connector-redis_2.11` has also been updated to the `Jedis` version 
> [https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/pom.xml#L37]
>  , but I have not Searching for the latest package on mvn, what did I miss?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to