This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit fc793137cc1808b422c2285e5a5353c685e57bac Author: zhu zhengwen <[email protected]> AuthorDate: Fri Jun 21 15:54:16 2019 +0800 create client instance for each sink/source --- src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 3 ++- src/main/java/org/apache/rocketmq/flink/RocketMQSource.java | 7 ++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index 41bbcbe..ca6848d 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.UUID; import org.apache.commons.lang.Validate; import org.apache.flink.configuration.Configuration; @@ -87,7 +88,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null"); producer = new DefaultMQProducer(); - producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); + producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID()); RocketMQConfig.buildProducerConfigs(props, producer); batchList = new LinkedList<>(); diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 14c479b..dcb1d31 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -19,10 +19,7 @@ package org.apache.rocketmq.flink; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.Validate; @@ -115,7 +112,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> pullConsumerScheduleService = new MQPullConsumerScheduleService(group); consumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); - consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); + consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID()); RocketMQConfig.buildConsumerConfigs(props, consumer); }
