[
https://issues.apache.org/jira/browse/FLINK-22247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22247:
-----------------------------------
Labels: stale-major (was: )
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Major but is unassigned and neither itself nor its Sub-Tasks have been updated
for 30 days. I have gone ahead and added a "stale-major" to the issue". If this
ticket is a Major, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> can not pass AddressList when connecting to rabbitmq
> ----------------------------------------------------
>
> Key: FLINK-22247
> URL: https://issues.apache.org/jira/browse/FLINK-22247
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> rabbitmq: 3.8.4
> Reporter: Spongebob
> Priority: Major
> Labels: stale-major
>
> We hope to connect to rabbitmq cluster address when using rabbitmq connector,
> So we override the setupConnection function to pass the rabbitmq cluster
> address, but the address class is not serializable thereby flink throws
> exception.
> {code:java}
> //代码占位符
> val rabbitmqAddresses = Array(
> new Address("xxx1", 5672),
> new Address("xxx2", 5672),
> new Address("xxx3", 5672))
> val dataStream = streamEnv
> .addSource(new RMQSource[String](
> rabbitmqConfig, // rabbitmq's connection config
> "queueName", // queue name
> true, // using correlation ids, assurance of exactly-once consume from
> rabbitmq
> new SimpleStringSchema // java deserialization
> ) {
> override def setupQueue(): Unit = {}
> override def setupConnection(): Connection = {
> rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
> }
> }).setParallelism(1)
> {code}
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException:
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object
> probably contains or references non serializable fields.Exception in thread
> "main" org.apache.flink.api.common.InvalidProgramException:
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object
> probably contains or references non serializable fields. at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693)
> at testConsumer$.main(testConsumer.scala:30) at
> testConsumer.main(testConsumer.scala)Caused by:
> java.io.NotSerializableException: com.rabbitmq.client.Address at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
> ... 9 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)