This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 12f0d0c710ea91b0767b84f95245f3eb44c29ce8 Author: Xin Wang <[email protected]> AuthorDate: Fri Mar 23 21:47:33 2018 +0800 [ROCKETMQ-82] RocketMQ-Flink Integration (#45) --- README.md | 147 +++++++++ pom.xml | 174 +++++++++++ .../org/apache/rocketmq/flink/RocketMQConfig.java | 134 +++++++++ .../org/apache/rocketmq/flink/RocketMQSink.java | 187 ++++++++++++ .../org/apache/rocketmq/flink/RocketMQSource.java | 331 +++++++++++++++++++++ .../org/apache/rocketmq/flink/RocketMQUtils.java | 36 +++ .../org/apache/rocketmq/flink/RunningChecker.java | 33 ++ .../common/selector/DefaultTopicSelector.java | 43 +++ .../flink/common/selector/SimpleTopicSelector.java | 73 +++++ .../flink/common/selector/TopicSelector.java | 28 ++ .../KeyValueDeserializationSchema.java | 27 ++ .../serialization/KeyValueSerializationSchema.java | 28 ++ .../SimpleKeyValueDeserializationSchema.java | 66 ++++ .../SimpleKeyValueSerializationSchema.java | 63 ++++ .../apache/rocketmq/flink/RocketMQSinkTest.java | 75 +++++ .../apache/rocketmq/flink/RocketMQSourceTest.java | 121 ++++++++ .../java/org/apache/rocketmq/flink/TestUtils.java | 33 ++ .../common/selector/DefaultTopicSelectorTest.java | 37 +++ .../common/selector/SimpleTopicSelectorTest.java | 49 +++ .../SimpleKeyValueSerializationSchemaTest.java | 42 +++ .../rocketmq/flink/example/ConsumerTest.java | 54 ++++ .../rocketmq/flink/example/ProducerTest.java | 57 ++++ .../flink/example/RocketMQFlinkExample.java | 76 +++++ style/copyright/Apache.xml | 24 ++ style/copyright/profiles_settings.xml | 64 ++++ style/rmq_checkstyle.xml | 135 +++++++++ style/rmq_codeStyle.xml | 157 ++++++++++ 27 files changed, 2294 insertions(+) diff --git a/README.md b/README.md new file mode 100644 index 0000000..ab1d456 --- /dev/null +++ b/README.md @@ -0,0 +1,147 @@ +# RocketMQ-Flink + +RocketMQ integration for [Apache Flink](https://flink.apache.org/). This module includes the RocketMQ source and sink that allows a flink job to either write messages into a topic or read from topics in a flink job. + + +## RocketMQSource +To use the `RocketMQSource`, you construct an instance of it by specifying a KeyValueDeserializationSchema instance and a Properties instance which including rocketmq configs. +`RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props)` +The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when checkpoints are enabled. +Otherwise, the source doesn't provide any reliability guarantees. + +### KeyValueDeserializationSchema +The main API for deserializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema` interface. +`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` implementations called `SimpleKeyValueDeserializationSchema`. + +```java +public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable { + T deserializeKeyAndValue(byte[] key, byte[] value); +} +``` + +## RocketMQSink +To use the `RocketMQSink`, you construct an instance of it by specifying KeyValueSerializationSchema & TopicSelector instances and a Properties instance which including rocketmq configs. +`RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props)` +The RocketMQSink provides at-least-once reliability guarantees when checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set. +Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy, for this case, the messages sending way is sync by default, +but you can change it by invoking `withAsync(true)`. + +### KeyValueSerializationSchema +The main API for serializing topic and tags is the `org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema` interface. +`rocketmq-flink` includes general purpose `KeyValueSerializationSchema` implementations called `SimpleKeyValueSerializationSchema`. + +```java +public interface KeyValueSerializationSchema<T> extends Serializable { + + byte[] serializeKey(T tuple); + + byte[] serializeValue(T tuple); +} +``` + +### TopicSelector +The main API for selecting topic and tags is the `org.apache.rocketmq.flink.common.selector.TopicSelector` interface. +`rocketmq-flink` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `SimpleTopicSelector`. + +```java +public interface TopicSelector<T> extends Serializable { + + String getTopic(T tuple); + + String getTag(T tuple); +} +``` + +## Examples +The following is an example which receive messages from RocketMQ brokers and send messages to broker after processing. + + ```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + Properties consumerProps = new Properties(); + consumerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + consumerProps.setProperty(RocketMqConfig.CONSUMER_GROUP, "c002"); + consumerProps.setProperty(RocketMqConfig.CONSUMER_TOPIC, "flink-source2"); + + Properties producerProps = new Properties(); + producerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, "localhost:9876"); + + env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps)) + .name("rocketmq-source") + .setParallelism(2) + .process(new ProcessFunction<Map, Map>() { + @Override + public void processElement(Map in, Context ctx, Collector<Map> out) throws Exception { + HashMap result = new HashMap(); + result.put("id", in.get("id")); + String[] arr = in.get("address").toString().split("\\s+"); + result.put("province", arr[arr.length-1]); + out.collect(result); + } + }) + .name("upper-processor") + .setParallelism(2) + .addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"), + new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true)) + .name("rocketmq-sink") + .setParallelism(2); + + try { + env.execute("rocketmq-flink-example"); + } catch (Exception e) { + e.printStackTrace(); + } + ``` + +## Configurations +The following configurations are all from the class `org.apache.rocketmq.flink.RocketMQConfig`. + +### Producer Configurations +| NAME | DESCRIPTION | DEFAULT | +| ------------- |:-------------:|:------:| +| nameserver.address | name server address *Required* | null | +| nameserver.poll.interval | name server poll topic info interval | 30000 | +| brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | +| producer.group | producer group | `UUID.randomUUID().toString()` | +| producer.retry.times | producer send messages retry times | 3 | +| producer.timeout | producer send messages timeout | 3000 | + + +### Consumer Configurations +| NAME | DESCRIPTION | DEFAULT | +| ------------- |:-------------:|:------:| +| nameserver.address | name server address *Required* | null | +| nameserver.poll.interval | name server poll topic info interval | 30000 | +| brokerserver.heartbeat.interval | broker server heartbeat interval | 30000 | +| consumer.group | consumer group *Required* | null | +| consumer.topic | consumer topic *Required* | null | +| consumer.tag | consumer topic tag | * | +| consumer.offset.reset.to | what to do when there is no initial offset on the server | latest/earliest/timestamp | +| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set | `System.currentTimeMillis()` | +| consumer.offset.persist.interval | auto commit offset interval | 5000 | +| consumer.pull.thread.pool.size | consumer pull thread pool size | 20 | +| consumer.batch.size | consumer messages batch size | 32 | +| consumer.delay.when.message.not.found | the delay time when messages were not found | 10 | + + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ecb3436 --- /dev/null +++ b/pom.xml @@ -0,0 +1,174 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-flink</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <!--maven properties --> + <maven.test.skip>false</maven.test.skip> + <maven.javadoc.skip>false</maven.javadoc.skip> + <!-- compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <rocketmq.version>4.2.0</rocketmq.version> + <flink.version>1.4.0</flink.version> + <commons-lang.version>2.5</commons-lang.version> + <scala.binary.version>2.11</scala.binary.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-common</artifactId> + <version>${rocketmq.version}</version> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>${commons-lang.version}</version> + </dependency> + + <!--test --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>4.12</version> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.5.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>1.5.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-namesrv</artifactId> + <version>${rocketmq.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-broker</artifactId> + <version>${rocketmq.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <encoding>UTF-8</encoding> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12.4</version> + <configuration> + <skipTests>${maven.test.skip}</skipTests> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>0.12</version> + <configuration> + <excludes> + <exclude>README.md</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <executions> + <execution> + <id>verify</id> + <phase>verify</phase> + <configuration> + <configLocation>style/rmq_checkstyle.xml</configLocation> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestResources>false</includeTestResources> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java new file mode 100644 index 0000000..8ec760b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import static org.apache.rocketmq.flink.RocketMQUtils.getInteger; + +/** + * RocketMQConfig for Consumer/Producer. + */ +public class RocketMQConfig { + // common + public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required + + public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval"; + public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds + + public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval"; + public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds + + + // producer + public static final String PRODUCER_GROUP = "producer.group"; + + public static final String PRODUCER_RETRY_TIMES = "producer.retry.times"; + public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3; + + public static final String PRODUCER_TIMEOUT = "producer.timeout"; + public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds + + + // consumer + public static final String CONSUMER_GROUP = "consumer.group"; // Required + + public static final String CONSUMER_TOPIC = "consumer.topic"; // Required + + public static final String CONSUMER_TAG = "consumer.tag"; + public static final String DEFAULT_CONSUMER_TAG = "*"; + + public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; + public static final String CONSUMER_OFFSET_LATEST = "latest"; + public static final String CONSUMER_OFFSET_EARLIEST = "earliest"; + public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp"; + public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp"; + + public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval"; + public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds + + public static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size"; + public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20; + + public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size"; + public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32; + + public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found"; + public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10; + + /** + * Build Producer Configs. + * @param props Properties + * @param producer DefaultMQProducer + */ + public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) { + buildCommonConfigs(props, producer); + + String group = props.getProperty(PRODUCER_GROUP); + if (StringUtils.isEmpty(group)) { + group = UUID.randomUUID().toString(); + } + producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group)); + + producer.setRetryTimesWhenSendFailed(getInteger(props, + PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES)); + producer.setRetryTimesWhenSendAsyncFailed(getInteger(props, + PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES)); + producer.setSendMsgTimeout(getInteger(props, + PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT)); + } + + /** + * Build Consumer Configs. + * @param props Properties + * @param consumer DefaultMQPushConsumer + */ + public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) { + buildCommonConfigs(props, consumer); + + consumer.setMessageModel(MessageModel.CLUSTERING); + + consumer.setPersistConsumerOffsetInterval(getInteger(props, + CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL)); + } + + /** + * Build Common Configs. + * @param props Properties + * @param client ClientConfig + */ + public static void buildCommonConfigs(Properties props, ClientConfig client) { + String nameServers = props.getProperty(NAME_SERVER_ADDR); + Validate.notEmpty(nameServers); + client.setNamesrvAddr(nameServers); + + client.setPollNameServerInterval(getInteger(props, + NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL)); + client.setHeartbeatBrokerInterval(getInteger(props, + BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL)); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java new file mode 100644 index 0000000..e79d1b4 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.Validate; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.flink.common.selector.TopicSelector; +import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The RocketMQSink provides at-least-once reliability guarantees when + * checkpoints are enabled and batchFlushOnCheckpoint(true) is set. + * Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy. + */ +public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class); + + private transient DefaultMQProducer producer; + private boolean async; // false by default + + private Properties props; + private TopicSelector<IN> topicSelector; + private KeyValueSerializationSchema<IN> serializationSchema; + + private boolean batchFlushOnCheckpoint; // false by default + private List<Message> batchList; + + public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) { + this.serializationSchema = schema; + this.topicSelector = topicSelector; + this.props = props; + } + + @Override + public void open(Configuration parameters) throws Exception { + Validate.notEmpty(props, "Producer properties can not be empty"); + Validate.notNull(topicSelector, "TopicSelector can not be null"); + Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null"); + + producer = new DefaultMQProducer(); + producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); + RocketMQConfig.buildProducerConfigs(props, producer); + + batchList = new LinkedList<>(); + + if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); + batchFlushOnCheckpoint = false; + } + + try { + producer.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + } + + @Override + public void invoke(IN input, Context context) throws Exception { + Message msg = prepareMessage(input); + + if (batchFlushOnCheckpoint) { + batchList.add(msg); + return; + } + + if (async) { + // async sending + try { + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + LOG.debug("Async send message success! result: {}", sendResult); + } + + @Override + public void onException(Throwable throwable) { + if (throwable != null) { + LOG.error("Async send message failure!", throwable); + } + } + }); + } catch (Exception e) { + LOG.error("Async send message failure!", e); + } + } else { + // sync sending, will return a SendResult + try { + SendResult result = producer.send(msg); + LOG.debug("Sync send message result: {}", result); + } catch (Exception e) { + LOG.error("Sync send message failure!", e); + } + } + } + + // Mapping: from storm tuple -> rocketmq Message + private Message prepareMessage(IN input) { + String topic = topicSelector.getTopic(input); + String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : ""; + + byte[] k = serializationSchema.serializeKey(input); + String key = k != null ? new String(k, StandardCharsets.UTF_8) : ""; + byte[] value = serializationSchema.serializeValue(input); + + Validate.notNull(topic, "the message topic is null"); + Validate.notNull(value, "the message body is null"); + + Message msg = new Message(topic, tag, key, value); + return msg; + } + + public RocketMQSink<IN> withAsync(boolean async) { + this.async = async; + return this; + } + + public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) { + this.batchFlushOnCheckpoint = batchFlushOnCheckpoint; + return this; + } + + @Override + public void close() throws Exception { + if (producer != null) { + flushSync(); + producer.shutdown(); + } + } + + private void flushSync() throws Exception { + if (batchFlushOnCheckpoint) { + synchronized (batchList) { + if (batchList.size() > 0) { + producer.send(batchList); + batchList.clear(); + } + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + flushSync(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // nothing to do + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java new file mode 100644 index 0000000..2dc8fd5 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang.Validate; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_EARLIEST; +import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_LATEST; +import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP; +import static org.apache.rocketmq.flink.RocketMQUtils.getInteger; +import static org.apache.rocketmq.flink.RocketMQUtils.getLong; + +/** + * The RocketMQSource is based on RocketMQ pull consumer mode, + * and provides exactly once reliability guarantees when checkpoints are enabled. + * Otherwise, the source doesn't provide any reliability guarantees. + */ +public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> + implements CheckpointedFunction, ResultTypeQueryable<OUT> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class); + + private transient MQPullConsumerScheduleService pullConsumerScheduleService; + private DefaultMQPullConsumer consumer; + + private KeyValueDeserializationSchema<OUT> schema; + + private RunningChecker runningChecker; + + private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates; + private Map<MessageQueue, Long> offsetTable; + private Map<MessageQueue, Long> restoredOffsets; + + private Properties props; + private String topic; + private String group; + + private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; + + private transient volatile boolean restored; + + public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) { + this.schema = schema; + this.props = props; + } + + @Override + public void open(Configuration parameters) throws Exception { + LOG.debug("source open...."); + Validate.notEmpty(props, "Consumer properties can not be empty"); + Validate.notNull(schema, "KeyValueDeserializationSchema can not be null"); + + this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC); + this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP); + + Validate.notEmpty(topic, "Consumer topic can not be empty"); + Validate.notEmpty(group, "Consumer group can not be empty"); + + if (offsetTable == null) { + offsetTable = new ConcurrentHashMap<>(); + } + if (restoredOffsets == null) { + restoredOffsets = new ConcurrentHashMap<>(); + } + + runningChecker = new RunningChecker(); + + pullConsumerScheduleService = new MQPullConsumerScheduleService(group); + consumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); + + consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())); + RocketMQConfig.buildConsumerConfigs(props, consumer); + } + + @Override + public void run(SourceContext context) throws Exception { + LOG.debug("source run...."); + // The lock that guarantees that record emission and state updates are atomic, + // from the view of taking a checkpoint. + final Object lock = context.getCheckpointLock(); + + int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, + RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); + + String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG); + + int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE, + RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE); + + int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE, + RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE); + + pullConsumerScheduleService.setPullThreadNums(pullPoolSize); + pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() { + + @Override + public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) { + try { + long offset = getMessageQueueOffset(mq); + if (offset < 0) { + return; + } + + PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize); + boolean found = false; + switch (pullResult.getPullStatus()) { + case FOUND: + List<MessageExt> messages = pullResult.getMsgFoundList(); + for (MessageExt msg : messages) { + byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null; + byte[] value = msg.getBody(); + OUT data = schema.deserializeKeyAndValue(key, value); + + // output and state update are atomic + synchronized (lock) { + context.collectWithTimestamp(data, msg.getBornTimestamp()); + } + } + found = true; + break; + case NO_MATCHED_MSG: + LOG.debug("No matched message after offset {} for queue {}", offset, mq); + break; + case NO_NEW_MSG: + break; + case OFFSET_ILLEGAL: + LOG.warn("Offset {} is illegal for queue {}", offset, mq); + break; + default: + break; + } + + synchronized (lock) { + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + } + + if (found) { + pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found + } else { + pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + try { + pullConsumerScheduleService.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + + runningChecker.setRunning(true); + + awaitTermination(); + + } + + private void awaitTermination() throws InterruptedException { + while (runningChecker.isRunning()) { + Thread.sleep(50); + } + } + + private long getMessageQueueOffset(MessageQueue mq) throws MQClientException { + Long offset = offsetTable.get(mq); + // restoredOffsets(unionOffsetStates) is the restored global union state; + // should only snapshot mqs that actually belong to us + if (restored && offset == null) { + offset = restoredOffsets.get(mq); + } + if (offset == null) { + offset = consumer.fetchConsumeOffset(mq, false); + if (offset < 0) { + String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); + switch (initialOffset) { + case CONSUMER_OFFSET_EARLIEST: + offset = consumer.minOffset(mq); + break; + case CONSUMER_OFFSET_LATEST: + offset = consumer.maxOffset(mq); + break; + case CONSUMER_OFFSET_TIMESTAMP: + offset = consumer.searchOffset(mq, getLong(props, + RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())); + break; + default: + throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO."); + } + } + offsetTable.put(mq, offset); + } + + return offsetTable.get(mq); + } + + private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException { + offsetTable.put(mq, offset); + consumer.updateConsumeOffset(mq, offset); + } + + @Override + public void cancel() { + LOG.debug("cancel ..."); + runningChecker.setRunning(false); + + if (pullConsumerScheduleService != null) { + pullConsumerScheduleService.shutdown(); + } + + offsetTable.clear(); + restoredOffsets.clear(); + } + + @Override + public void close() throws Exception { + LOG.debug("close ..."); + // pretty much the same logic as cancelling + try { + cancel(); + } finally { + super.close(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // called when a snapshot for a checkpoint is requested + + if (!runningChecker.isRunning()) { + LOG.debug("snapshotState() called on closed source; returning null."); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state {} ...", context.getCheckpointId()); + } + + unionOffsetStates.clear(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", + offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) { + unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // called every time the user-defined function is initialized, + // be that when the function is first initialized or be that + // when the function is actually recovering from an earlier checkpoint. + // Given this, initializeState() is not only the place where different types of state are initialized, + // but also where state recovery logic is included. + LOG.debug("initialize State ..."); + + this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>( + OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { }))); + + this.restored = context.isRestored(); + + if (restored) { + if (restoredOffsets == null) { + restoredOffsets = new ConcurrentHashMap<>(); + } + for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) { + // unionOffsetStates is the restored global union state; + // should only snapshot mqs that actually belong to us + restoredOffsets.put(mqOffsets.f0, mqOffsets.f1); + } + LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets); + } else { + LOG.info("No restore state for the consumer."); + } + } + + @Override + public TypeInformation<OUT> getProducedType() { + return schema.getProducedType(); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java new file mode 100644 index 0000000..9ca1de2 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.Properties; + +public final class RocketMQUtils { + + public static int getInteger(Properties props, String key, int defaultValue) { + return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue))); + } + + public static long getLong(Properties props, String key, long defaultValue) { + return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue))); + } + + public static boolean getBoolean(Properties props, String key, boolean defaultValue) { + return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue))); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/RunningChecker.java new file mode 100644 index 0000000..b7bc2b9 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/RunningChecker.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.io.Serializable; + +public class RunningChecker implements Serializable { + private volatile boolean isRunning = false; + + public boolean isRunning() { + return isRunning; + } + + public void setRunning(boolean running) { + isRunning = running; + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java new file mode 100644 index 0000000..264d211 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +public class DefaultTopicSelector<T> implements TopicSelector<T> { + private final String topicName; + private final String tagName; + + public DefaultTopicSelector(final String topicName, final String tagName) { + this.topicName = topicName; + this.tagName = tagName; + } + + public DefaultTopicSelector(final String topicName) { + this(topicName, ""); + } + + @Override + public String getTopic(T tuple) { + return topicName; + } + + @Override + public String getTag(T tuple) { + return tagName; + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java new file mode 100644 index 0000000..3ad8a03 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Uses field name to select topic and tag name from tuple. + */ +public class SimpleTopicSelector implements TopicSelector<Map> { + private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class); + + private final String topicFieldName; + private final String defaultTopicName; + + private final String tagFieldName; + private final String defaultTagName; + + /** + * SimpleTopicSelector Constructor. + * @param topicFieldName field name used for selecting topic + * @param defaultTopicName default field name used for selecting topic + * @param tagFieldName field name used for selecting tag + * @param defaultTagName default field name used for selecting tag + */ + public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) { + this.topicFieldName = topicFieldName; + this.defaultTopicName = defaultTopicName; + this.tagFieldName = tagFieldName; + this.defaultTagName = defaultTagName; + } + + @Override + public String getTopic(Map tuple) { + if (tuple.containsKey(topicFieldName)) { + Object topic = tuple.get(topicFieldName); + return topic != null ? topic.toString() : defaultTopicName; + } else { + LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName); + return defaultTopicName; + } + } + + @Override + public String getTag(Map tuple) { + if (tuple.containsKey(tagFieldName)) { + Object tag = tuple.get(tagFieldName); + return tag != null ? tag.toString() : defaultTagName; + } else { + LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName); + return defaultTagName; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java new file mode 100644 index 0000000..2a347db --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import java.io.Serializable; + +public interface TopicSelector<T> extends Serializable { + + String getTopic(T tuple); + + String getTag(T tuple); +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java new file mode 100644 index 0000000..d8759f9 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.io.Serializable; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable { + T deserializeKeyAndValue(byte[] key, byte[] value); +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java new file mode 100644 index 0000000..d847e8a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.io.Serializable; + +public interface KeyValueSerializationSchema<T> extends Serializable { + + byte[] serializeKey(T tuple); + + byte[] serializeValue(T tuple); +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java new file mode 100644 index 0000000..df6390b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> { + public static final String DEFAULT_KEY_FIELD = "key"; + public static final String DEFAULT_VALUE_FIELD = "value"; + + public String keyField; + public String valueField; + + public SimpleKeyValueDeserializationSchema() { + this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD); + } + + /** + * SimpleKeyValueDeserializationSchema Constructor. + * @param keyField tuple field for selecting the key + * @param valueField tuple field for selecting the value + */ + public SimpleKeyValueDeserializationSchema(String keyField, String valueField) { + this.keyField = keyField; + this.valueField = valueField; + } + + @Override + public Map deserializeKeyAndValue(byte[] key, byte[] value) { + HashMap map = new HashMap(2); + if (keyField != null) { + String k = key != null ? new String(key, StandardCharsets.UTF_8) : null; + map.put(keyField, k); + } + if (valueField != null) { + String v = value != null ? new String(value, StandardCharsets.UTF_8) : null; + map.put(valueField, v); + } + return map; + } + + @Override + public TypeInformation<Map> getProducedType() { + return TypeInformation.of(Map.class); + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java new file mode 100644 index 0000000..bbd6da3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class SimpleKeyValueSerializationSchema implements KeyValueSerializationSchema<Map> { + public static final String DEFAULT_KEY_FIELD = "key"; + public static final String DEFAULT_VALUE_FIELD = "value"; + + public String keyField; + public String valueField; + + public SimpleKeyValueSerializationSchema() { + this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD); + } + + /** + * SimpleKeyValueSerializationSchema Constructor. + * @param keyField tuple field for selecting the key + * @param valueField tuple field for selecting the value + */ + public SimpleKeyValueSerializationSchema(String keyField, String valueField) { + this.keyField = keyField; + this.valueField = valueField; + } + + @Override + public byte[] serializeKey(Map tuple) { + if (tuple == null || keyField == null) { + return null; + } + Object key = tuple.get(keyField); + return key != null ? key.toString().getBytes(StandardCharsets.UTF_8) : null; + } + + @Override + public byte[] serializeValue(Map tuple) { + if (tuple == null || valueField == null) { + return null; + } + Object value = tuple.get(valueField); + return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null; + } + +} diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java new file mode 100644 index 0000000..ec844f2 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector; +import org.apache.rocketmq.flink.common.selector.TopicSelector; +import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.rocketmq.flink.TestUtils.setFieldValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class RocketMQSinkTest { + + private RocketMQSink rocketMQSink; + private DefaultMQProducer producer; + + @Before + public void setUp() throws Exception { + KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name"); + TopicSelector topicSelector = new DefaultTopicSelector("tpc"); + Properties props = new Properties(); + rocketMQSink = new RocketMQSink(serializationSchema, topicSelector, props); + + producer = mock(DefaultMQProducer.class); + setFieldValue(rocketMQSink, "producer", producer); + } + + @Test + public void testSink() throws Exception { + Map tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + tuple.put("tpc", "tpc1"); + + rocketMQSink.invoke(tuple, null); + + verify(producer).send(any(Message.class)); + + } + + @Test + public void close() throws Exception { + rocketMQSink.close(); + + verify(producer).shutdown(); + } + +} \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java new file mode 100644 index 0000000..b7aaee0 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.rocketmq.flink.TestUtils.setFieldValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RocketMQSourceTest { + + private RocketMQSource rocketMQSource; + private MQPullConsumerScheduleService pullConsumerScheduleService; + private DefaultMQPullConsumer consumer; + private KeyValueDeserializationSchema deserializationSchema; + private String topic = "tpc"; + + @Before + public void setUp() throws Exception { + deserializationSchema = new SimpleKeyValueDeserializationSchema(); + Properties props = new Properties(); + rocketMQSource = new RocketMQSource(deserializationSchema, props); + + setFieldValue(rocketMQSource, "topic", topic); + setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck()); + setFieldValue(rocketMQSource, "offsetTable", new ConcurrentHashMap<>()); + setFieldValue(rocketMQSource, "restoredOffsets", new ConcurrentHashMap<>()); + + pullConsumerScheduleService = new MQPullConsumerScheduleService("g"); + + consumer = mock(DefaultMQPullConsumer.class); + pullConsumerScheduleService.setDefaultMQPullConsumer(consumer); + setFieldValue(rocketMQSource, "consumer", consumer); + setFieldValue(rocketMQSource, "pullConsumerScheduleService", pullConsumerScheduleService); + } + + @Test + public void testSource() throws Exception { + List<MessageExt> msgFoundList = new ArrayList<>(); + MessageExt messageExt = new MessageExt(); + messageExt.setKeys("keys"); + messageExt.setBody("body data".getBytes()); + messageExt.setBornTimestamp(System.currentTimeMillis()); + msgFoundList.add(messageExt); + PullResult pullResult = new PullResult(PullStatus.FOUND, 3, 1, 5, msgFoundList); + + when(consumer.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())).thenReturn(2L); + when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult); + + SourceContext context = mock(SourceContext.class); + when(context.getCheckpointLock()).thenReturn(new Object()); + + rocketMQSource.run(context); + + // schedule the pull task + Set<MessageQueue> set = new HashSet(); + set.add(new MessageQueue(topic, "brk", 1)); + pullConsumerScheduleService.putTask(topic, set); + + MessageExt msg = pullResult.getMsgFoundList().get(0); + + // atLeastOnce: re-pulling immediately when messages found before + verify(context, atLeastOnce()).collectWithTimestamp(deserializationSchema.deserializeKeyAndValue(msg.getKeys().getBytes(), + msg.getBody()), msg.getBornTimestamp()); + } + + @Test + public void close() throws Exception { + rocketMQSource.close(); + + verify(consumer).shutdown(); + } + + class SingleRunningCheck extends RunningChecker { + @Override + public boolean isRunning() { + return false; + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/TestUtils.java b/src/test/java/org/apache/rocketmq/flink/TestUtils.java new file mode 100644 index 0000000..d0a9450 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/TestUtils.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink; + +import java.lang.reflect.Field; + +public class TestUtils { + public static void setFieldValue(Object obj, String fieldName, Object value) { + try { + Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(obj, value); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java new file mode 100644 index 0000000..2f4685c --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class DefaultTopicSelectorTest { + @Test + public void getTopic() throws Exception { + DefaultTopicSelector selector = new DefaultTopicSelector("rocket"); + assertEquals("rocket", selector.getTopic(null)); + assertEquals("", selector.getTag(null)); + + selector = new DefaultTopicSelector("rocket", "tg"); + assertEquals("rocket", selector.getTopic(null)); + assertEquals("tg", selector.getTag(null)); + } + +} \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java new file mode 100644 index 0000000..6ac1a57 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.selector; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SimpleTopicSelectorTest { + @Test + public void getTopic() throws Exception { + SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", "tg", "dtg"); + Map tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + tuple.put("tpc", "tpc1"); + tuple.put("tg", "tg1"); + + assertEquals("tpc1", selector.getTopic(tuple)); + assertEquals("tg1", selector.getTag(tuple)); + + tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + + assertEquals("dtpc", selector.getTopic(tuple)); + assertEquals("dtg", selector.getTag(tuple)); + } + +} \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java new file mode 100644 index 0000000..98aa793 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.common.serialization; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SimpleKeyValueSerializationSchemaTest { + @Test + public void serializeKeyAndValue() throws Exception { + SimpleKeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name"); + SimpleKeyValueDeserializationSchema deserializationSchema = new SimpleKeyValueDeserializationSchema("id", "name"); + + Map tuple = new HashMap(); + tuple.put("id", "x001"); + tuple.put("name", "vesense"); + + assertEquals(tuple, deserializationSchema.deserializeKeyAndValue(serializationSchema.serializeKey(tuple), + serializationSchema.serializeValue(tuple))); + } + +} \ No newline at end of file diff --git a/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java b/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java new file mode 100644 index 0000000..1b07b8d --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.example; + +import java.util.List; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +public class ConsumerTest { + public static void main(String[] args) { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003"); + consumer.setNamesrvAddr("localhost:9876"); + try { + consumer.subscribe("flink-sink2", "*"); + } catch (MQClientException e) { + e.printStackTrace(); + } + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + System.out.println(msg.getKeys() + ":" + new String(msg.getBody())); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + try { + consumer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java b/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java new file mode 100644 index 0000000..c04ca74 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.example; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class ProducerTest { + public static void main(String[] args) { + DefaultMQProducer producer = new DefaultMQProducer("p001"); + producer.setNamesrvAddr("localhost:9876"); + try { + producer.start(); + } catch (MQClientException e) { + e.printStackTrace(); + } + for (int i = 0; i < 10000; i++) { + Message msg = new Message("flink-source2" , "", "id_"+i, ("country_X province_" + i).getBytes()); + try { + producer.send(msg); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("send " + i); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java new file mode 100644 index 0000000..b2a4034 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.flink.example; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.rocketmq.flink.RocketMQConfig; +import org.apache.rocketmq.flink.RocketMQSink; +import org.apache.rocketmq.flink.RocketMQSource; +import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema; +import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema; + +public class RocketMQFlinkExample { + public static void main(String[] args) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + Properties consumerProps = new Properties(); + consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876"); + consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "c002"); + consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "flink-source2"); + + Properties producerProps = new Properties(); + producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876"); + + env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps)) + .name("rocketmq-source") + .setParallelism(2) + .process(new ProcessFunction<Map, Map>() { + @Override + public void processElement(Map in, Context ctx, Collector<Map> out) throws Exception { + HashMap result = new HashMap(); + result.put("id", in.get("id")); + String[] arr = in.get("address").toString().split("\\s+"); + result.put("province", arr[arr.length-1]); + out.collect(result); + } + }) + .name("upper-processor") + .setParallelism(2) + .addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"), + new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true)) + .name("rocketmq-sink") + .setParallelism(2); + + try { + env.execute("rocketmq-flink-example"); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/style/copyright/Apache.xml b/style/copyright/Apache.xml new file mode 100644 index 0000000..2db86d0 --- /dev/null +++ b/style/copyright/Apache.xml @@ -0,0 +1,24 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<component name="CopyrightManager"> + <copyright> + <option name="myName" value="Apache"/> + <option name="notice" + value="Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/lice [...] + </copyright> +</component> \ No newline at end of file diff --git a/style/copyright/profiles_settings.xml b/style/copyright/profiles_settings.xml new file mode 100644 index 0000000..4c0e521 --- /dev/null +++ b/style/copyright/profiles_settings.xml @@ -0,0 +1,64 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<component name="CopyrightManager"> + <settings default="Apache"> + <module2copyright> + <element module="All" copyright="Apache"/> + </module2copyright> + <LanguageOptions name="GSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="HTML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JAVA"> + <option name="fileTypeOverride" value="3"/> + <option name="addBlankAfter" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSP"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="JSPX"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="MXML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="Properties"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="SPI"> + <option name="fileTypeOverride" value="3"/> + <option name="block" value="false"/> + </LanguageOptions> + <LanguageOptions name="XML"> + <option name="fileTypeOverride" value="3"/> + <option name="prefixLines" value="false"/> + </LanguageOptions> + <LanguageOptions name="__TEMPLATE__"> + <option name="separateBefore" value="true"/> + <option name="lenBefore" value="1"/> + </LanguageOptions> + </settings> +</component> \ No newline at end of file diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml new file mode 100644 index 0000000..e3155cc --- /dev/null +++ b/style/rmq_checkstyle.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE module PUBLIC + "-//Puppy Crawl//DTD Check Configuration 1.3//EN" + "http://www.puppycrawl.com/dtds/configuration_1_3.dtd"> +<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding --> +<module name="Checker"> + + <property name="localeLanguage" value="en"/> + + <!--To configure the check to report on the first instance in each file--> + <module name="FileTabCharacter"/> + + <!-- header --> + <module name="RegexpHeader"> + <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="System\.out\.println"/> + <property name="message" value="Prohibit invoking System.out.println in source code !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//FIXME"/> + <property name="message" value="Recommended fix FIXME task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="//TODO"/> + <property name="message" value="Recommended fix TODO task !"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" value="@alibaba"/> + <property name="message" value="Recommended remove @alibaba keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@taobao"/> + <property name="message" value="Recommended remove @taobao keyword!"/> + </module> + <module name="RegexpSingleline"> + <property name="format" value="@author"/> + <property name="message" value="Recommended remove @author tag in javadoc!"/> + </module> + + <module name="RegexpSingleline"> + <property name="format" + value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/> + <property name="message" value="Not allow chinese character !"/> + </module> + + <module name="FileLength"> + <property name="max" value="3000"/> + </module> + + <module name="TreeWalker"> + + <module name="UnusedImports"> + <property name="processJavadoc" value="true"/> + </module> + <module name="RedundantImport"/> + + <!--<module name="IllegalImport" />--> + + <!--Checks that classes that override equals() also override hashCode()--> + <module name="EqualsHashCode"/> + <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.--> + <module name="SimplifyBooleanExpression"/> + <module name="OneStatementPerLine"/> + <module name="UnnecessaryParentheses"/> + <!--Checks for over-complicated boolean return statements. For example the following code--> + <module name="SimplifyBooleanReturn"/> + + <!--Check that the default is after all the cases in producerGroup switch statement--> + <module name="DefaultComesLast"/> + <!--Detects empty statements (standalone ";" semicolon)--> + <module name="EmptyStatement"/> + <!--Checks that long constants are defined with an upper ell--> + <module name="UpperEll"/> + <module name="ConstantName"> + <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/> + </module> + <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property--> + <module name="LocalVariableName"/> + <!--Validates identifiers for local, final variables, including catch parameters--> + <module name="LocalFinalVariableName"/> + <!--Validates identifiers for non-static fields--> + <module name="MemberName"/> + <!--Validates identifiers for class type parameters--> + <module name="ClassTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <!--Validates identifiers for method type parameters--> + <module name="MethodTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"/> + <module name="TypeName"/> + <!--Checks that there are no import statements that use the * notation--> + <module name="AvoidStarImport"/> + + <!--whitespace--> + <module name="GenericWhitespace"/> + <module name="NoWhitespaceBefore"/> + <module name="WhitespaceAfter"/> + <module name="NoWhitespaceAfter"/> + <module name="WhitespaceAround"> + <property name="allowEmptyConstructors" value="true"/> + <property name="allowEmptyMethods" value="true"/> + </module> + <module name="Indentation"/> + <module name="MethodParamPad"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + </module> +</module> diff --git a/style/rmq_codeStyle.xml b/style/rmq_codeStyle.xml new file mode 100644 index 0000000..cd95ee6 --- /dev/null +++ b/style/rmq_codeStyle.xml @@ -0,0 +1,157 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<code_scheme name="rocketmq"> + <option name="USE_SAME_INDENTS" value="true"/> + <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/> + <option name="OTHER_INDENT_OPTIONS"> + <value> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + <option name="USE_TAB_CHARACTER" value="false"/> + <option name="SMART_TABS" value="false"/> + <option name="LABEL_INDENT_SIZE" value="0"/> + <option name="LABEL_INDENT_ABSOLUTE" value="false"/> + <option name="USE_RELATIVE_INDENTS" value="false"/> + </value> + </option> + <option name="PREFER_LONGER_NAMES" value="false"/> + <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/> + <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND"> + <value/> + </option> + <option name="IMPORT_LAYOUT_TABLE"> + <value> + <package name="" withSubpackages="true" static="false"/> + <emptyLine/> + <package name="" withSubpackages="true" static="true"/> + </value> + </option> + <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/> + <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/> + <option name="JD_P_AT_EMPTY_LINES" value="false"/> + <option name="JD_KEEP_INVALID_TAGS" value="false"/> + <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="WRAP_COMMENTS" value="true"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <JavaCodeStyleSettings> + <option name="CLASS_NAMES_IN_JAVADOC" value="3"/> + </JavaCodeStyleSettings> + <XML> + <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/> + </XML> + <ADDITIONAL_INDENT_OPTIONS fileType="haml"> + <option name="INDENT_SIZE" value="2"/> + </ADDITIONAL_INDENT_OPTIONS> + <codeStyleSettings language="Groovy"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="HOCON"> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="JAVA"> + <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="SPACE_AFTER_TYPE_CAST" value="false"/> + <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/> + <option name="LABELED_STATEMENT_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="JSON"> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + </codeStyleSettings> + <codeStyleSettings language="Scala"> + <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/> + <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/> + <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/> + <option name="ELSE_ON_NEW_LINE" value="true"/> + <option name="WHILE_ON_NEW_LINE" value="true"/> + <option name="CATCH_ON_NEW_LINE" value="true"/> + <option name="FINALLY_ON_NEW_LINE" value="true"/> + <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/> + <option name="ALIGN_MULTILINE_FOR" value="false"/> + <option name="METHOD_PARAMETERS_WRAP" value="1"/> + <option name="METHOD_ANNOTATION_WRAP" value="1"/> + <option name="CLASS_ANNOTATION_WRAP" value="1"/> + <option name="FIELD_ANNOTATION_WRAP" value="1"/> + <option name="PARENT_SETTINGS_INSTALLED" value="true"/> + <indentOptions> + <option name="INDENT_SIZE" value="4"/> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + <option name="TAB_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> + <codeStyleSettings language="XML"> + <indentOptions> + <option name="CONTINUATION_INDENT_SIZE" value="4"/> + </indentOptions> + </codeStyleSettings> +</code_scheme> \ No newline at end of file
