This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit b5c05e0f5ed71ba1f4aa9eae55136995df724f9f Author: xlzl <[email protected]> AuthorDate: Mon Jun 3 11:05:31 2019 +0800 flink-rocketmq-sink , producer send message set delay level (optional… (#237) --- .../org/apache/rocketmq/flink/RocketMQConfig.java | 27 +++++++++++++++++++--- .../org/apache/rocketmq/flink/RocketMQSink.java | 20 ++++++++++++---- .../apache/rocketmq/flink/RocketMQSinkTest.java | 1 + .../flink/example/RocketMQFlinkExample.java | 6 ++++- 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java index 8ec760b..5b43b31 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java @@ -34,7 +34,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getInteger; * RocketMQConfig for Consumer/Producer. */ public class RocketMQConfig { - // common + // Server Config public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval"; @@ -44,7 +44,7 @@ public class RocketMQConfig { public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds - // producer + // Producer related config public static final String PRODUCER_GROUP = "producer.group"; public static final String PRODUCER_RETRY_TIMES = "producer.retry.times"; @@ -54,7 +54,7 @@ public class RocketMQConfig { public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds - // consumer + // Consumer related config public static final String CONSUMER_GROUP = "consumer.group"; // Required public static final String CONSUMER_TOPIC = "consumer.topic"; // Required @@ -80,6 +80,27 @@ public class RocketMQConfig { public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found"; public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10; + public static final String MSG_DELAY_LEVEL = "msg.delay.level"; + public static final int MSG_DELAY_LEVEL00 = 0; // no delay + public static final int MSG_DELAY_LEVEL01 = 1; // 1s + public static final int MSG_DELAY_LEVEL02 = 2; // 5s + public static final int MSG_DELAY_LEVEL03 = 3; // 10s + public static final int MSG_DELAY_LEVEL04 = 4; // 30s + public static final int MSG_DELAY_LEVEL05 = 5; // 1min + public static final int MSG_DELAY_LEVEL06 = 6; // 2min + public static final int MSG_DELAY_LEVEL07 = 7; // 3min + public static final int MSG_DELAY_LEVEL08 = 8; // 4min + public static final int MSG_DELAY_LEVEL09 = 9; // 5min + public static final int MSG_DELAY_LEVEL10 = 10; // 6min + public static final int MSG_DELAY_LEVEL11 = 11; // 7min + public static final int MSG_DELAY_LEVEL12 = 12; // 8min + public static final int MSG_DELAY_LEVEL13 = 13; // 9min + public static final int MSG_DELAY_LEVEL14 = 14; // 10min + public static final int MSG_DELAY_LEVEL15 = 15; // 20min + public static final int MSG_DELAY_LEVEL16 = 16; // 30min + public static final int MSG_DELAY_LEVEL17 = 17; // 1h + public static final int MSG_DELAY_LEVEL18 = 18; // 2h + /** * Build Producer Configs. * @param props Properties diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index 65274af..41bbcbe 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -62,10 +62,22 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint private int batchSize = 1000; private List<Message> batchList; + private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00; + public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) { this.serializationSchema = schema; this.topicSelector = topicSelector; this.props = props; + + if (this.props != null) { + this.messageDeliveryDelayLevel = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL, + RocketMQConfig.MSG_DELAY_LEVEL00); + if (this.messageDeliveryDelayLevel < RocketMQConfig.MSG_DELAY_LEVEL00) { + this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00; + } else if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL18) { + this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL18; + } + } } @Override @@ -105,7 +117,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint } if (async) { - // async sending try { producer.send(msg, new SendCallback() { @Override @@ -124,7 +135,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint LOG.error("Async send message failure!", e); } } else { - // sync sending, will return a SendResult try { SendResult result = producer.send(msg); LOG.debug("Sync send message result: {}", result); @@ -134,7 +144,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint } } - // Mapping: from storm tuple -> rocketmq Message private Message prepareMessage(IN input) { String topic = topicSelector.getTopic(input); String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : ""; @@ -147,6 +156,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint Validate.notNull(value, "the message body is null"); Message msg = new Message(topic, tag, key, value); + if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) { + msg.setDelayTimeLevel(this.messageDeliveryDelayLevel); + } return msg; } @@ -191,6 +203,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint @Override public void initializeState(FunctionInitializationContext context) throws Exception { - // nothing to do + // Nothing to do } } diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java index ec844f2..74a10b0 100644 --- a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java +++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java @@ -46,6 +46,7 @@ public class RocketMQSinkTest { KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name"); TopicSelector topicSelector = new DefaultTopicSelector("tpc"); Properties props = new Properties(); + props.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04)); rocketMQSink = new RocketMQSink(serializationSchema, topicSelector, props); producer = mock(DefaultMQProducer.class); diff --git a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java index b2a4034..f4f654e 100644 --- a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java +++ b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java @@ -46,6 +46,10 @@ public class RocketMQFlinkExample { Properties producerProps = new Properties(); producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876"); + int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL05; + producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(msgDelayLevel)); + // TimeDelayLevel is not supported for batching + boolean batchFlag = msgDelayLevel <= 0; env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps)) .name("rocketmq-source") @@ -63,7 +67,7 @@ public class RocketMQFlinkExample { .name("upper-processor") .setParallelism(2) .addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"), - new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true)) + new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(batchFlag)) .name("rocketmq-sink") .setParallelism(2);
