[
https://issues.apache.org/jira/browse/FLINK-22247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22247:
-----------------------------------
Labels: auto-deprioritized-major (was: stale-major)
Priority: Minor (was: Major)
This issue was labeled "stale-major" 7 ago and has not received any updates so
it is being deprioritized. If this ticket is actually Major, please raise the
priority and ask a committer to assign you the issue or revive the public
discussion.
> can not pass AddressList when connecting to rabbitmq
> ----------------------------------------------------
>
> Key: FLINK-22247
> URL: https://issues.apache.org/jira/browse/FLINK-22247
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.12.2
> Environment: flink: 1.12.2
> rabbitmq: 3.8.4
> Reporter: Spongebob
> Priority: Minor
> Labels: auto-deprioritized-major
>
> We hope to connect to rabbitmq cluster address when using rabbitmq connector,
> So we override the setupConnection function to pass the rabbitmq cluster
> address, but the address class is not serializable thereby flink throws
> exception.
> {code:java}
> //代码占位符
> val rabbitmqAddresses = Array(
> new Address("xxx1", 5672),
> new Address("xxx2", 5672),
> new Address("xxx3", 5672))
> val dataStream = streamEnv
> .addSource(new RMQSource[String](
> rabbitmqConfig, // rabbitmq's connection config
> "queueName", // queue name
> true, // using correlation ids, assurance of exactly-once consume from
> rabbitmq
> new SimpleStringSchema // java deserialization
> ) {
> override def setupQueue(): Unit = {}
> override def setupConnection(): Connection = {
> rabbitmqConfig.getConnectionFactory.newConnection(rabbitmqAddresses)
> }
> }).setParallelism(1)
> {code}
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException:
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object
> probably contains or references non serializable fields.Exception in thread
> "main" org.apache.flink.api.common.InvalidProgramException:
> [Lcom.rabbitmq.client.Address;@436a4e4b is not serializable. The object
> probably contains or references non serializable fields. at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1685)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1652)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:693)
> at testConsumer$.main(testConsumer.scala:30) at
> testConsumer.main(testConsumer.scala)Caused by:
> java.io.NotSerializableException: com.rabbitmq.client.Address at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
> ... 9 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)