icchux opened a new issue, #79:
URL: https://github.com/apache/rocketmq-flink/issues/79
When I tested connecting to Alibaba Cloud RocketMQ in Local Idea, I
encountered some problems. How can I solve this problem
### env
<rocketmq.version>4.7.1</rocketmq.version>
<flink.version>1.15.0</flink.version>
### Demo Code
`Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, ONS_ADDR);
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP,
CONSUMER_GROUP);
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC,
CONSUMER_TOPIC);
consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, ONS_KEY);
consumerProps.setProperty(RocketMQConfig.SECRET_KEY, SECRET_KEY);
RocketMQSourceFunction<Map<Object,Object>> source = new
RocketMQSourceFunction(
new SimpleKeyValueDeserializationSchema("id", "data"),
consumerProps);
source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
env.addSource(source).setParallelism(1).print(" =======> ");`
### ErrorLog
Caused by: org.apache.rocketmq.client.exception.MQClientException: The
message queue is not in assigned list, message queue: MessageQueue
[topic=MQ_INST_1956905707885195_BauvlVUQ%dev-po-parcel-route-add,
brokerName=qd-internet-pull-01, queueId=0]
For more information, please visit the url,
http://rocketmq.apache.org/docs/faq/
at
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:543)
at
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:565)
at
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:297)
at
org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:396)
at
org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:254)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]