wangyongchun8888 opened a new issue #19:
URL: https://github.com/apache/rocketmq-flink/issues/19


   package com.zto.route.flink.rocketmq;
   
   import com.zto.route.flink.rocketmq.common.util.MetricUtils;
   import com.zto.route.flink.rocketmq.common.util.RetryUtil;
   import com.zto.route.flink.rocketmq.common.watermark.WaterMarkForAll;
   import com.zto.route.flink.rocketmq.common.watermark.WaterMarkPerQueue;
   import org.apache.commons.lang3.Validate;
   import org.apache.flink.api.common.functions.RuntimeContext;
   import org.apache.flink.api.common.state.CheckpointListener;
   import org.apache.flink.api.common.state.ListState;
   import org.apache.flink.api.common.state.ListStateDescriptor;
   import org.apache.flink.api.common.typeinfo.TypeHint;
   import org.apache.flink.api.common.typeinfo.TypeInformation;
   import org.apache.flink.api.java.tuple.Tuple2;
   import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
   import org.apache.flink.configuration.Configuration;
   import org.apache.flink.metrics.Counter;
   import org.apache.flink.metrics.Meter;
   import org.apache.flink.metrics.MeterView;
   import org.apache.flink.metrics.SimpleCounter;
   import org.apache.flink.runtime.state.FunctionInitializationContext;
   import org.apache.flink.runtime.state.FunctionSnapshotContext;
   import 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
   import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
   import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
   import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
   import org.apache.flink.util.Collector;
   import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
   import org.apache.rocketmq.client.exception.MQClientException;
   import org.apache.rocketmq.common.message.MessageExt;
   import org.apache.rocketmq.common.message.MessageQueue;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.lang.management.ManagementFactory;
   import java.util.*;
   import java.util.concurrent.*;
   
   import com.zto.route.flink.rocketmq.common.util.RocketMQUtils;
   
   public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
           implements CheckpointedFunction, CheckpointListener, 
ResultTypeQueryable<OUT> {
       private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQSource.class);
       private static final long serialVersionUID = 1L;
       // state name
       private static final String OFFSETS_STATE_NAME = 
"topic-partition-offset-states";
       ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
       private RunningChecker runningChecker;
       private volatile Object checkPointLock;
       private transient volatile boolean restored;
       private transient boolean enableCheckpoint;
       private RocketMQCollector rocketMQCollector;
       private List<MessageQueue> assignQueues;
       private ExecutorService executor;
       private DefaultLitePullConsumer consumer;
       private RocketMQDeserializationSchema<OUT> deserializer;
       private Properties props;
       private String topic;
       private String group;
       private Map<MessageQueue, Long> offsetTable = new ConcurrentHashMap<>();
       private ScheduledExecutorService timer;
       // watermark in source
       private WaterMarkPerQueue waterMarkPerQueue;
       private WaterMarkForAll waterMarkForAll;
       private Meter tpsMetric;
   
       public RocketMQSourceV3(RocketMQDeserializationSchema<OUT> deserializer, 
Properties props) {
           this.deserializer = deserializer;
           this.props = props;
       }
   
       @Override
       public void open(Configuration parameters) throws Exception {
           LOG.debug("source run ......");
   
           Validate.notEmpty(props, "Consumer properties can not be empty");
   
           this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
           this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
   
           Validate.notEmpty(topic, "Consumer topic can not be empty");
           Validate.notEmpty(group, "Consumer group can not be empty");
           this.enableCheckpoint = ((StreamingRuntimeContext) 
getRuntimeContext()).isCheckpointingEnabled();
   
           runningChecker = new RunningChecker();
           runningChecker.setRunning(true);
   
           rocketMQCollector = new RocketMQCollector();
   
           waterMarkPerQueue = new WaterMarkPerQueue(5000);
   
           waterMarkForAll = new WaterMarkForAll(5000);
   
           Counter outputCounter = getRuntimeContext().getMetricGroup()
                   .counter(MetricUtils.METRICS_TPS + "_counter", new 
SimpleCounter());
           tpsMetric = getRuntimeContext().getMetricGroup()
                   .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 
60));
           final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                   .setDaemon(true).setNameFormat("rmq-pull-thread-%d").build();
           executor = Executors.newCachedThreadPool(threadFactory);
   
           timer = Executors.newSingleThreadScheduledExecutor();
           // 初始化consumer 并分配队列
           startConsumer();
       }
   
       @Override
       public void run(SourceContext<OUT> context) throws Exception {
           checkPointLock = context.getCheckpointLock();
           timer.scheduleAtFixedRate(() -> {
               context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
               context.emitWatermark(waterMarkForAll.getCurrentWatermark());
           }, 5, 5, TimeUnit.SECONDS);
   
           this.executor.execute(() -> RetryUtil.call(() -> {
   
               while (runningChecker.isRunning()) {
                   List<MessageExt> messages = consumer.poll();
                   for (MessageExt msg : messages) {
                       synchronized (checkPointLock) {
                           deserializer.deserialize(msg, rocketMQCollector);
                           // get record from collector and use sourceContext 
emit it to down task
                           Queue<OUT> records = rocketMQCollector.getRecords();
                           OUT record;
                           while ((record = records.poll()) != null) {
                               context.collectWithTimestamp(record, 
msg.getBornTimestamp());
                           }
                           // record offset to offset table
                           recordBrokerOffset(msg);
   
                           // update max eventTime per queue
                           
waterMarkPerQueue.extractTimestamp(buildMessageQueue(msg), 
msg.getBornTimestamp());
                           
waterMarkForAll.extractTimestamp(msg.getBornTimestamp());
                           tpsMetric.markEvent();
                       }
                   }
               }
               return true;
           }, "RuntimeException"));
   
           awaitTermination();
       }
   
       private void awaitTermination() throws InterruptedException {
           while (runningChecker.isRunning()) {
               Thread.sleep(50);
           }
       }
   
       @Override
       public void cancel() {
           LOG.debug("cancel ...");
           runningChecker.setRunning(false);
   
           if (consumer != null) {
               consumer.shutdown();
           }
   
           if (offsetTable != null) {
               offsetTable.clear();
           }
       }
   
       @Override
       public void notifyCheckpointComplete(long checkpointId) throws Exception 
{
           if (!consumer.isAutoCommit()) {
               LOG.info("commit consumer offset .......");
               consumer.commitSync();
           }
       }
   
   
       @Override
       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
           // called when a snapshot for a checkpoint is requested
           LOG.info("Snapshotting state {} ...", context.getCheckpointId());
           if (!runningChecker.isRunning()) {
               LOG.info("snapshotState() called on closed source; returning 
null.");
               return;
           }
   
           // Discovery topic Route change when snapshot
           RetryUtil.call(() -> {
               List<MessageQueue> newQueues = getAssignQueues();
               Collections.sort(newQueues);
               LOG.debug(getRuntimeContext().getIndexOfThisSubtask() + " Topic 
route is same.");
               if (!assignQueues.equals(newQueues)) {
                   throw new RuntimeException();
               }
               return true;
           }, "RuntimeException due to topic route changed");
   
           unionOffsetStates.clear();
           for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
               LOG.info("snapshot {}, offset {}", entry.getKey(), 
entry.getValue());
               unionOffsetStates.add(Tuple2.of(entry.getKey(), 
entry.getValue()));
           }
       }
   
       @Override
       public void initializeState(FunctionInitializationContext context) 
throws Exception {
           LOG.info("initialize State ...");
   
           unionOffsetStates = 
context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
                   OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() {
           })));
           this.restored = context.isRestored();
           if (restored) {
   
               unionOffsetStates.get().forEach(t -> offsetTable.put(t.f0, 
t.f1));
               LOG.info("Restore from state, {}", offsetTable);
           } else {
               LOG.info("No restored from state ......");
           }
       }
   
       @Override
       public TypeInformation<OUT> getProducedType() {
           return deserializer.getProducedType();
       }
   
       private void recordBrokerOffset(MessageExt message) {
           MessageQueue mq = buildMessageQueue(message);
           long queueOffset = message.getQueueOffset();
           offsetTable.put(mq, queueOffset);
           // 如果没有启用chk,并且没有启用自动提交,那么每次要提交offset
           if (!enableCheckpoint && !consumer.isAutoCommit()) {
               consumer.commitSync();
           }
       }
   
       private MessageQueue buildMessageQueue(MessageExt message) {
           String topic = message.getTopic();
           String brokerName = message.getBrokerName();
           int queueId = message.getQueueId();
           return new MessageQueue(topic, brokerName, queueId);
       }
   
       private void startConsumer() throws MQClientException {
           LOG.info("consumer start ");
           consumer = new DefaultLitePullConsumer(this.group, 
RocketMQConfig.buildAclRPCHook(props));
           String nameServers = 
props.getProperty(RocketMQConfig.NAME_SERVER_ADDR);
           Validate.notEmpty(nameServers);
           consumer.setNamesrvAddr(nameServers);
           consumer.setPollNameServerInterval(RocketMQUtils.getInteger(props,
                   RocketMQConfig.NAME_SERVER_POLL_INTERVAL, 
RocketMQConfig.DEFAULT_NAME_SERVER_POLL_INTERVAL));
           consumer.setHeartbeatBrokerInterval(RocketMQUtils.getInteger(props,
                   RocketMQConfig.BROKER_HEART_BEAT_INTERVAL, 
RocketMQConfig.DEFAULT_BROKER_HEART_BEAT_INTERVAL));
           String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
           int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
           String instanceName = RocketMQUtils.getInstanceName(runtimeName, 
topic, group,
                   String.valueOf(indexOfThisSubTask), 
String.valueOf(System.nanoTime()));
           consumer.setInstanceName(instanceName);
   //        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
   //        consumer.subscribe(this.topic, tag);
           boolean autoCommit = RocketMQUtils.getBoolean(props, 
RocketMQConfig.OFFSET_AUTO_COMMIT, false);
           consumer.setAutoCommit(autoCommit);
           consumer.start();
           consumer.assign(getAssignQueues());
           // 从offsetTable中移除不在本task 
中分配的队列,做snapshot,并从恢复的offset中,seek到上次的offset
           removeUnAssignQueues();
   
           // 每个MessageQueue指定offset消费
           perQueueSeekToSpecialOffset();
       }
   
       private List<MessageQueue> getAssignQueues() throws MQClientException {
           final RuntimeContext ctx = getRuntimeContext();
           int taskNumber = ctx.getNumberOfParallelSubtasks();
           int taskIndex = ctx.getIndexOfThisSubtask();
           Collection<MessageQueue> totalQueues = 
consumer.fetchMessageQueues(this.topic);
           List<MessageQueue> shouldAssignQueues = 
RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
           assignQueues = shouldAssignQueues;
           return shouldAssignQueues;
       }
   
       // 从offsetTable中移除此consumer未消费的信息
       private void removeUnAssignQueues() throws MQClientException {
           // offset table 开始从状态恢复,保存的是全部queue的信息,移除多余的
           this.offsetTable.forEach((k, v) -> {
               if (!this.assignQueues.contains(k)) {
                   this.offsetTable.remove(k);
               }
           });
       }
   
       // 根据不同策略,指定不同offset
       private void perQueueSeekToSpecialOffset() throws MQClientException {
           for (MessageQueue mq : this.assignQueues) {
               if (this.offsetTable.containsKey(mq)) {
                   long offset = this.offsetTable.get(mq) + 1;
                   LOG.info("consumer seek {} from state, offset {}", mq, 
offset);
                   this.consumer.seek(mq, offset);
               } else {
                   // 如果是从状态恢复,但是找不到,那么这个offset 就是新加入的
                   if (this.restored) {
                       this.consumer.seekToBegin(mq);
                   } else {
                       // 不是restored,那么就是直接重启,根据提供的策略选择offset
                       String offsetFrom = 
props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, 
Offset.LATEST.toString());
                       Offset initialOffsetFrom = Offset.valueOf(offsetFrom);
                       switch (initialOffsetFrom) {
                           case EARLIEST:
                               consumer.seekToBegin(mq);
                               LOG.info("{} seek to begin ......", mq);
                               break;
                           case LATEST:
                               consumer.seekToEnd(mq);
                               LOG.info("{} seek to end ......", mq);
                               break;
                           case STORE:
                               break;
                           case TIMESTAMP:
                               LOG.info("{} seek to timestamp {} ......", mq, 
System.currentTimeMillis());
                               consumer.seek(mq, System.currentTimeMillis());
                               break;
                           default:
                               break;
                       }
                   }
               }
           }
       }
   
       // 用来保存值
       private class RocketMQCollector implements Collector<OUT> {
           private final Queue<OUT> records = new ArrayDeque<>();
           private boolean endOfStreamSignalled = false;
   
           @Override
           public void collect(OUT record) {
               // do not emit subsequent elements if the end of the stream 
reached
               if (endOfStreamSignalled || deserializer.isEndOfStream(record)) {
                   endOfStreamSignalled = true;
                   return;
               }
               records.add(record);
           }
   
           public Queue<OUT> getRecords() {
               return records;
           }
   
           public boolean isEndOfStreamSignalled() {
               return endOfStreamSignalled;
           }
   
           public void setEndOfStreamSignalled(boolean endOfStreamSignalled) {
               this.endOfStreamSignalled = endOfStreamSignalled;
           }
   
           @Override
           public void close() {
   
           }
       }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to