[ROCKETMQ-81] Add the RocketMq plugin for the Apache Spark, thanks @hustfxj for the pull part, closes apache/incubator-rocketmq-externals#4, closes apache/incubator-rocketmq-externals#5
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/7ceba1b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/7ceba1b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/7ceba1b2 Branch: refs/heads/master Commit: 7ceba1b232cdef82d69a1d8af7821b878ff29b52 Parents: 42fc223 Author: vesense <[email protected]> Authored: Thu Apr 27 20:53:45 2017 +0800 Committer: yukon <[email protected]> Committed: Thu Apr 27 21:09:25 2017 +0800 ---------------------------------------------------------------------- .gitignore | 1 + README.md | 9 +- rocketmq-spark/.gitignore | 7 + rocketmq-spark/README.md | 229 ++++++++ rocketmq-spark/pom.xml | 193 +++++++ .../rocketmq/spark/OffsetCommitCallback.java | 33 ++ .../apache/rocketmq/spark/RocketMQConfig.java | 179 +++++++ .../org/apache/rocketmq/spark/TopicQueueId.java | 80 +++ .../streaming/DefaultMessageRetryManager.java | 86 +++ .../spark/streaming/MessageRetryManager.java | 49 ++ .../rocketmq/spark/streaming/MessageSet.java | 89 +++ .../streaming/ReliableRocketMQReceiver.java | 111 ++++ .../spark/streaming/RocketMQReceiver.java | 118 ++++ .../rocketmq/spark/CachedMQConsumer.scala | 177 ++++++ .../rocketmq/spark/ConsumerStrategy.scala | 85 +++ .../rocketmq/spark/LocationStrategy.scala | 66 +++ .../org/apache/rocketmq/spark/Logging.scala | 60 +++ .../org/apache/rocketmq/spark/OffsetRange.scala | 126 +++++ .../rocketmq/spark/RocketMqRDDPartition.scala | 55 ++ .../apache/rocketmq/spark/RocketMqUtils.scala | 233 ++++++++ .../spark/streaming/MQPullInputDStream.scala | 535 +++++++++++++++++++ .../rocketmq/spark/streaming/RocketMqRDD.scala | 230 ++++++++ .../rocketmq/spark/RocketMQServerMock.java | 119 +++++ .../streaming/MessageRetryManagerTest.java | 112 ++++ .../streaming/ReliableRocketMQReceiverTest.java | 73 +++ .../spark/streaming/RocketMQReceiverTest.java | 73 +++ .../spark/streaming/RocketMqUtilsTest.java | 162 ++++++ rocketmq-spark/style/copyright/Apache.xml | 24 + .../style/copyright/profiles_settings.xml | 64 +++ rocketmq-spark/style/rmq_checkstyle.xml | 135 +++++ rocketmq-spark/style/rmq_codeStyle.xml | 157 ++++++ rocketmq-storm/README.md | 2 + 32 files changed, 3669 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index da8c211..07a5fa6 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ There are some RocketMQ external projects, with the purpose of growing the Rocke ## RocketMQ-Console-Ng A console for RocketMQ + ## RocketMQ-JMS RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target. @@ -20,10 +21,12 @@ This project is used to receive and send messages between 4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later) -## RocketMQ-Docker -Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image. - +## RocketMQ-Spark +Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided. +For more details please refer to rocketmq-spark README.md. +## RocketMQ-Docker +Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-spark/.gitignore b/rocketmq-spark/.gitignore new file mode 100644 index 0000000..400aa01 --- /dev/null +++ b/rocketmq-spark/.gitignore @@ -0,0 +1,7 @@ +target/ +.idea/ +.settings/ +.project +.classpath +*.iml +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/README.md ---------------------------------------------------------------------- diff --git a/rocketmq-spark/README.md b/rocketmq-spark/README.md new file mode 100644 index 0000000..5a5cd04 --- /dev/null +++ b/rocketmq-spark/README.md @@ -0,0 +1,229 @@ +rocketmq-spark +========================== + +This project is used to receive message from Rocketmq for Spark Streaming. Both push & pull consumer mode are provided. It provides simple parallelism, 1:1 correspondence between RocketMq's message queue id and Spark partitions. + +## Install +For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact. + + groupId = org.apache.rocketmq + artifactId = rocketmq-spark + version = 0.0.1-SNAPSHOT + +In fact we may not find the artifact, So we should execute the following command in rocketmq-spark root directory firstly. + + `mvn clean install dependency:copy-dependencies` + +## Creating a RocketMq Stream + +For Scala: + +``` + val dStream: InputDStream[MessageExt] = RocketMqUtils.createMQPullStream(streamingContext, groupId, topic, ConsumerStrategy.earliest, true, false, false) + + dStream.map(message => (message.getBody)).print() +``` + +For Java: + +``` + JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(javaStreamingContext, groupId, + topics, ConsumerStrategy.earliest(), true, false, false); + + stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() { + @Override + public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception { + messageExtJavaRDD.foreach(new VoidFunction<MessageExt>() { + @Override + public void call(MessageExt messageExt) throws Exception { + System.out.println(messageExt.toString()); + } + }); + } + }); +``` + +## Creating a RocketMq RDD + +For Scala: + +``` + val offsetRanges = new util.HashMap[TopicQueueId, Array[OffsetRange]] + val topicQueueId1 = new TopicQueueId("topic", 1) + val ranges1 = Array(OffsetRange("groupId", 1, "broker-1", 0, 100), OffsetRange("groupId", 1, "broker-2", 0, 100)) + offsetRanges.put(topicQueueId1, ranges1) + val topicQueueId2 = new TopicQueueId("topic", 2) + val ranges2 = Array(OffsetRange("groupId", 2, "broker-1", 0, 100), OffsetRange("groupId", 2, "broker-2", 0, 100)) + offsetRanges.put(topicQueueId1, ranges2) + val optionParams = new util.HashMap[String, String] + + val rdd: RDD[MessageExt] = RocketMqUtils.createRDD(sparkContext, groupId, offsetRanges, optionParams) + rdd.foreach(message => System.out.println(message.getBody)) +``` + +For Java: + +``` + Map<TopicQueueId, OffsetRange[]> offsetRanges = new HashMap<>(); + TopicQueueId topicQueueId1 = new TopicQueueId("topic", 1); + OffsetRange [] ranges1 = {OffsetRange.create("groupId", 1, "broker-1", 0, 100), + OffsetRange.create("groupId", 1, "broker-2", 0, 100)}; + offsetRanges.put(topicQueueId1, ranges1); + + TopicQueueId topicQueueId2 = new TopicQueueId("topic", 2); + OffsetRange [] ranges2 = {OffsetRange.create("groupId", 2, "broker-1", 0, 100), + OffsetRange.create("groupId", 2, "broker-2", 0, 100)}; + offsetRanges.put(topicQueueId2, ranges2); + + Map<String, String> optionParams= new HashMap(); + LocationStrategy locationStrategy = LocationStrategies.PreferConsistent(); + + + JavaRDD<MessageExt> rdd = RocketMqUtils.createJavaRDD(sparkContext, groupId, offsetRanges, + optionParams, locationStrategy); + + rdd.foreach(new VoidFunction<MessageExt>() { + @Override + public void call(MessageExt messageExt) throws Exception { + System.out.println(messageExt.getBodyCRC()); + } + }); +``` + +## LocationStrategies + +The RocketMq consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers. + +In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location). + +The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) RocketMq partitions, you can change this setting via `pull.consumer.cache.maxCapacity` + +## ConsumerStrategy + +The RocketMq consumer will start to consume from different offset based on different consumer strategy. + +1. EarliestStrategy: Specify the earliest available offset for every message queue to start to consume. But if the Rocketmq server has checkpoint for a message queue, then the consumer will consume from the checkpoint. + +2. LatestStrategy: Specify the lastest available offset for every message queue to start to consume. But if the Rocketmq server has checkpoint for a message queue, then the consumer will consume from the checkpoint. + +3. SpecificOffsetStrategy: Specify the specific available offset for every message queue to start to consume. Generally if the Rocketmq server has checkpoint for a message queue, then the consumer will consume from the checkpoint. But if the forceSpecial is true, the consumer will start to consume from the specific available offset in any case. Of course, the consumer will start to consume from the min available offset if a message queue is not specified. If the specify offset is 'ConsumerStrategy.LATEST' for a message queue, it indicates resolution to the latest offset. And if the specify offset is 'ConsumerStrategy.EARLIEST', it indicates resolution to the earliest offset. + +## Obtaining Offsets + +Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createMQPullStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. Be aware that the one-to-one mapping between RDD partition and RocketMq partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window(). + +`dStream.foreachRDD { rdd => + val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd.foreachPartition { iter => + val queueId = TaskContext.get.partitionId + val offsets: Array[OffsetRange] = offsetRanges.get(new TopicQueueId(topic, queueId)) + } + }` + + +## Storing Offsets + +RocketMq delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are at-least-once. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliability (and code complexity), for how to store offsets. + +### Checkpoints + +If you enable Spark checkpointing, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets. + +### Storing offsets based on RocketMq Server + +RocketMq has an offset commit API that stores offsets in a special RocketMq topic. By default, the new consumer will auto-commit offsets by setting "autoCommit" true. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. Then messages maybe lost. However, you can commit offsets to Rocket after you your output has been stored, using the commitAsync API.At the same time, you must make "autoCommit" be false. The benefit as compared to checkpoints is that RocketMq is a durable store regardless of changes to your application code. However, RocketMq is not transactional, so your outputs must still be idempotent. + +For Scala: + +``` + //store commits + dStream.foreachRDD { rdd => + val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + // some time later, after outputs have completed + dStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) + } +``` + +For Java: + +``` + dStream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + // some time later, after outputs have completed + ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); + }); +``` + + +### Commit offsets based on your own data store + +For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you're careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent. + +``` + // begin from the the offsets committed to the database + val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => + new MessageQueue(resultSet.string("topic"), resultSet.string("broker"), + resultSet.int("queueId")) -> resultSet.long("offset") + }.toMap + + val specificStrategy = ConsumerStrategy.specificOffset(fromOffsets) + val stream = RocketMqUtils + .createMQPullStream(streamingContext, groupId, topic, specificStrategy, false, true, true) + + stream.foreachRDD { rdd => + val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + val results = yourCalculation(rdd) + + // begin your transaction + + // update results + // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // assert that offsets were updated correctly + + // end your transaction + } +``` + +## RocketMQConfig + +_The following configs are for Consumer Pull Mode_ + +|Property Name | Default | Meaning | +| ------------ | --------| ------ | +| pull.max.speed.per.partition | -1 | Maximum rate (number of records per second) at which data will be read from each RocketMq partition, and the default value is "-1", it means consumer can pull message from rocketmq as fast as the consumer can. Other that, you also enables or disables Spark Streaming's internal backpressure mechanism by the config "spark.streaming.backpressure.enabled". | +|pull.max.batch.size|32|To pick up the consume speed, the consumer can pull a batch of messages at a time.| +|pull.timeout.ms|3000|pull timeout for the RocketMq consumer| +|pull.consumer.cache.initialCapacity| 16|the configs for consumer cache| +|pull.consumer.cache.maxCapacity| 64|the configs for consumer cache| +|pull.consumer.cache.loadFactor|0.75|the configs for consumer cache| + +## failOnDataLoss + +Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. + + +## RocketMQ Receiver (Using Consumer Push Mode) + +* RocketMQReceiver - which is no fault-tolerance guarantees +* ReliableRocketMQReceiver - which is fault-tolerance guarantees + +### example: +``` + SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); + JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); + Properties properties = new Properties(); + properties.setProperty(RocketMQConfig.NAME_SERVER_ADDR, NAMESERVER_ADDR); + properties.setProperty(RocketMQConfig.CONSUMER_GROUP, CONSUMER_GROUP); + properties.setProperty(RocketMQConfig.CONSUMER_TOPIC, CONSUMER_TOPIC); + + // no fault-tolerance guarantees + JavaInputDStream ds = RocketMQUtils.createInputDStream(jssc, properties, StorageLevel.MEMORY_ONLY()); + // fault-tolerance guarantees + // JavaInputDStream ds = RocketMQUtils.createReliableInputDStream(jssc, properties, StorageLevel.MEMORY_ONLY()); + ds.print(); + jssc.start(); + jssc.awaitTerminationOrTimeout(60000); +``` http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/pom.xml ---------------------------------------------------------------------- diff --git a/rocketmq-spark/pom.xml b/rocketmq-spark/pom.xml new file mode 100644 index 0000000..91c8915 --- /dev/null +++ b/rocketmq-spark/pom.xml @@ -0,0 +1,193 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-spark</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <developers> + <developer> + <id>vesense</id> + <name>Xin Wang</name> + <email>[email protected]</email> + </developer> + <developer> + <id>hustfxj</id> + <name>John Fang</name> + <email>[email protected]</email> + </developer> + </developers> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <!--maven properties --> + <maven.test.skip>true</maven.test.skip> + <maven.javadoc.skip>true</maven.javadoc.skip> + <!-- compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <rocketmq.version>4.0.0-incubating</rocketmq.version> + <spark.version>2.1.0</spark.version> + <scala.version>2.11.8</scala.version> + <scala.binary.version>2.11</scala.binary.version> + <commons-lang.version>2.5</commons-lang.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-common</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>${commons-lang.version}</version> + </dependency> + + <!--test --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>4.12</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-namesrv</artifactId> + <version>${rocketmq.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-broker</artifactId> + <version>${rocketmq.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <encoding>UTF-8</encoding> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12.4</version> + <configuration> + <skipTests>${maven.test.skip}</skipTests> + </configuration> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.5</version> + <executions> + <execution> + <id>compile-scala</id> + <phase>compile</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>test-compile-scala</id> + <phase>test-compile</phase> + <goals> + <goal>add-source</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <version>0.12</version> + <configuration> + <excludes> + <exclude>README.md</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <executions> + <execution> + <id>verify</id> + <phase>verify</phase> + <configuration> + <configLocation>style/rmq_checkstyle.xml</configLocation> + <encoding>UTF-8</encoding> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestResources>false</includeTestResources> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/OffsetCommitCallback.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/OffsetCommitCallback.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/OffsetCommitCallback.java new file mode 100644 index 0000000..8dca49b --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/OffsetCommitCallback.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark; + +import org.apache.rocketmq.common.message.MessageQueue; +import java.util.Map; + +/** + * A callback interface that the user can implement to trigger custom actions when a commit request completes. + */ +public interface OffsetCommitCallback { + /** + * A callback method the user can implement to provide asynchronous handling of commit request completion. + * This method will be called by InputDstream when the last batch is handled successfully. + * @param offsets the offsets which already are handled successfully + * @param exception The exception thrown during processing of the request, or null if the commit completed successfully + */ + void onComplete(Map<MessageQueue, Long> offsets, Exception exception); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/RocketMQConfig.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/RocketMQConfig.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/RocketMQConfig.java new file mode 100644 index 0000000..91b8e8e --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/RocketMQConfig.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark; + +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.remoting.common.RemotingUtil; + +import java.util.Properties; +import java.util.UUID; + +/** + * RocketMQConfig for Consumer + */ +public class RocketMQConfig { + // ------- the following is for common usage ------- + /** + * RocketMq name server address + */ + public static final String NAME_SERVER_ADDR = "nameserver.addr"; // Required + + public static final String CLIENT_NAME = "client.name"; + + public static final String CLIENT_IP = "client.ip"; + public static final String DEFAULT_CLIENT_IP = RemotingUtil.getLocalAddress(); + + public static final String CLIENT_CALLBACK_EXECUTOR_THREADS = "client.callback.executor.threads"; + public static final int DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS = Runtime.getRuntime().availableProcessors();; + + public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval"; + public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds + + public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval"; + public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds + + + // ------- the following is for push consumer mode ------- + /** + * RocketMq consumer group + */ + public static final String CONSUMER_GROUP = "consumer.group"; // Required + + /** + * RocketMq consumer topic + */ + public static final String CONSUMER_TOPIC = "consumer.topic"; // Required + + public static final String CONSUMER_TAG = "consumer.tag"; + public static final String DEFAULT_TAG = "*"; + + public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; + public static final String CONSUMER_OFFSET_LATEST = "latest"; + public static final String CONSUMER_OFFSET_EARLIEST = "earliest"; + public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp"; + + public static final String CONSUMER_MESSAGES_ORDERLY = "consumer.messages.orderly"; + + public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval"; + public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds + + public static final String CONSUMER_MIN_THREADS = "consumer.min.threads"; + public static final int DEFAULT_CONSUMER_MIN_THREADS = 20; + + public static final String CONSUMER_MAX_THREADS = "consumer.max.threads"; + public static final int DEFAULT_CONSUMER_MAX_THREADS = 64; + + + // ------- the following is for reliable Receiver ------- + public static final String QUEUE_SIZE = "spout.queue.size"; + public static final int DEFAULT_QUEUE_SIZE = 500; + + public static final String MESSAGES_MAX_RETRY = "spout.messages.max.retry"; + public static final int DEFAULT_MESSAGES_MAX_RETRY = 3; + + public static final String MESSAGES_TTL = "spout.messages.ttl"; + public static final int DEFAULT_MESSAGES_TTL = 300000; // 5min + + + // ------- the following is for pull consumer mode ------- + + /** + * Maximum rate (number of records per second) at which data will be read from each RocketMq partition , + * and the default value is "-1", it means consumer can pull message from rocketmq as fast as the consumer can. + * Other that, you also enables or disables Spark Streaming's internal backpressure mechanism by the config + * "spark.streaming.backpressure.enabled". + */ + public static final String MAX_PULL_SPEED_PER_PARTITION = "pull.max.speed.per.partition"; + + /** + * To pick up the consume speed, the consumer can pull a batch of messages at a time. And the default + * value is "32" + */ + public static final String PULL_MAX_BATCH_SIZE = "pull.max.batch.size"; + + /** + * pull timeout for the consumer, and the default time is "3000". + */ + public static final String PULL_TIMEOUT_MS = "pull.timeout.ms"; + + // the following configs for consumer cache + public static final String PULL_CONSUMER_CACHE_INIT_CAPACITY = "pull.consumer.cache.initialCapacity"; + public static final String PULL_CONSUMER_CACHE_MAX_CAPACITY = "pull.consumer.cache.maxCapacity"; + public static final String PULL_CONSUMER_CACHE_LOAD_FACTOR = "pull.consumer.cache.loadFactor"; + + + public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) { + buildCommonConfigs(props, consumer); + + String group = props.getProperty(CONSUMER_GROUP); + Validate.notEmpty(group); + consumer.setConsumerGroup(group); + + consumer.setPersistConsumerOffsetInterval(RocketMqUtils.getInteger(props, + CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL)); + consumer.setConsumeThreadMin(RocketMqUtils.getInteger(props, + CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS)); + consumer.setConsumeThreadMax(RocketMqUtils.getInteger(props, + CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS)); + + String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST); + switch (initOffset) { + case CONSUMER_OFFSET_EARLIEST: + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + break; + case CONSUMER_OFFSET_LATEST: + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + break; + case CONSUMER_OFFSET_TIMESTAMP: + consumer.setConsumeTimestamp(initOffset); + break; + default: + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + } + + String topic = props.getProperty(CONSUMER_TOPIC); + Validate.notEmpty(topic); + try { + consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG)); + } catch (MQClientException e) { + throw new IllegalArgumentException(e); + } + } + + public static void buildCommonConfigs(Properties props, ClientConfig client) { + String namesvr = props.getProperty(NAME_SERVER_ADDR); + Validate.notEmpty(namesvr); + client.setNamesrvAddr(namesvr); + + client.setClientIP(props.getProperty(CLIENT_IP, DEFAULT_CLIENT_IP)); + // use UUID for client name by default + String defaultClientName = UUID.randomUUID().toString(); + client.setInstanceName(props.getProperty(CLIENT_NAME, defaultClientName)); + + client.setClientCallbackExecutorThreads(RocketMqUtils.getInteger(props, + CLIENT_CALLBACK_EXECUTOR_THREADS, DEFAULT_CLIENT_CALLBACK_EXECUTOR_THREADS)); + client.setPollNameServerInteval(RocketMqUtils.getInteger(props, + NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL)); + client.setHeartbeatBrokerInterval(RocketMqUtils.getInteger(props, + BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/TopicQueueId.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/TopicQueueId.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/TopicQueueId.java new file mode 100644 index 0000000..5ea2afe --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/TopicQueueId.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark; +import java.io.Serializable; + +public final class TopicQueueId implements Serializable { + + private int hash = 0; + private final int queueId; + private final String topic; + + public TopicQueueId(String topic, int queueId) { + this.queueId = queueId; + this.topic = topic; + } + + public int queueId() { + return queueId; + } + + public String topic() { + return topic; + } + + @Override + public int hashCode() { + if (hash != 0) + return hash; + final int prime = 31; + int result = 1; + result = prime * result + queueId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + this.hash = result; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TopicQueueId other = (TopicQueueId) obj; + if (queueId != other.queueId) + return false; + if (topic == null) { + if (other.topic != null) + return false; + } else if (!topic.equals(other.topic)) + return false; + return true; + } + + @Override + protected TopicQueueId clone() throws CloneNotSupportedException { + return new TopicQueueId(this.topic, this.queueId); + } + + @Override + public String toString() { + return topic + "-" + queueId; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/DefaultMessageRetryManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/DefaultMessageRetryManager.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/DefaultMessageRetryManager.java new file mode 100644 index 0000000..beb1409 --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/DefaultMessageRetryManager.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** + * An implementation of MessageRetryManager + */ +public class DefaultMessageRetryManager implements MessageRetryManager{ + private Map<String,MessageSet> cache = new ConcurrentHashMap<>(500); + private BlockingQueue<MessageSet> queue; + private int maxRetry; + private int ttl; + + public DefaultMessageRetryManager(BlockingQueue<MessageSet> queue, final int maxRetry, final int ttl) { + this.queue = queue; + this.maxRetry = maxRetry; + this.ttl = ttl; + + long period = 5000; + new Timer().scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + long now = System.currentTimeMillis(); + for (Map.Entry<String, MessageSet> entry : cache.entrySet()) { + String id = entry.getKey(); + MessageSet messageSet = entry.getValue(); + if (now - messageSet.getTimestamp() >= ttl) { // no ack/fail received in ttl + fail(id); + } + } + } + }, period, period); + } + + public void ack(String id) { + cache.remove(id); + } + + public void fail(String id) { + MessageSet messageSet = cache.remove(id); + if (messageSet == null) { + return; + } + + if (needRetry(messageSet)) { + messageSet.setRetries(messageSet.getRetries() + 1); + messageSet.setTimestamp(0); + queue.offer(messageSet); + } + } + + public void mark(MessageSet messageSet) { + messageSet.setTimestamp(System.currentTimeMillis()); + cache.put(messageSet.getId(), messageSet); + } + + public boolean needRetry(MessageSet messageSet) { + return messageSet.getRetries() < maxRetry; + } + + // just for testing + public void setCache(Map<String,MessageSet> cache) { + this.cache = cache; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageRetryManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageRetryManager.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageRetryManager.java new file mode 100644 index 0000000..12071d4 --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageRetryManager.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +/** + * Interface for messages retry manager + */ +public interface MessageRetryManager { + /** + * message with the id is success + * @param id + */ + void ack(String id); + + /** + * message with the id is failure + * @param id + */ + void fail(String id); + + /** + * Mark the messageSet + * @param messageSet + */ + void mark(MessageSet messageSet); + + /** + * Is the messageSet need retry + * @param messageSet + * @return + */ + boolean needRetry(MessageSet messageSet); + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageSet.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageSet.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageSet.java new file mode 100644 index 0000000..ad9d264 --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/MessageSet.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +/** + * A message collection. + */ +public class MessageSet implements Iterator<Message>, Serializable{ + private String id; + private List<MessageExt> data; + private long timestamp; + private int retries; + + public MessageSet(String id, List<MessageExt> data) { + this.id = id; + this.data = data; + } + + public MessageSet(List<MessageExt> data) { + this(UUID.randomUUID().toString(), data); + } + + public String getId() { + return id; + } + + public List<MessageExt> getData() { + return data; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public int getRetries() { + return retries; + } + + public void setRetries(int retries) { + this.retries = retries; + } + + @Override + public boolean hasNext() { + return data.iterator().hasNext(); + } + + @Override + public Message next() { + return data.iterator().next(); + } + + @Override + public void remove() { + data.iterator().remove(); + } + + @Override + public String toString() { + return data.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiver.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiver.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiver.java new file mode 100644 index 0000000..ccc6c15 --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/ReliableRocketMQReceiver.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spark.RocketMQConfig; +import org.apache.spark.storage.StorageLevel; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.rocketmq.spark.RocketMqUtils; + +/** + * The ReliableRocketMQReceiver is fault-tolerance guarantees + */ +public class ReliableRocketMQReceiver extends RocketMQReceiver { + private BlockingQueue<MessageSet> queue; + private MessageRetryManager messageRetryManager; + private MessageSender sender; + + public ReliableRocketMQReceiver(Properties properties, StorageLevel storageLevel) { + super(properties, storageLevel); + } + + @Override + public void onStart() { + int queueSize = RocketMqUtils.getInteger(properties, RocketMQConfig.QUEUE_SIZE, RocketMQConfig.DEFAULT_QUEUE_SIZE); + queue = new LinkedBlockingQueue<>(queueSize); + + int maxRetry = RocketMqUtils.getInteger(properties, RocketMQConfig.MESSAGES_MAX_RETRY, RocketMQConfig.DEFAULT_MESSAGES_MAX_RETRY); + int ttl = RocketMqUtils.getInteger(properties, RocketMQConfig.MESSAGES_TTL, RocketMQConfig.DEFAULT_MESSAGES_TTL); + this.messageRetryManager = new DefaultMessageRetryManager(queue, maxRetry, ttl); + + this.sender = new MessageSender(); + this.sender.setName("MessageSender"); + this.sender.setDaemon(true); + this.sender.start(); + + super.onStart(); + } + + public boolean process(List<MessageExt> msgs) { + if (msgs.isEmpty()) { + return true; + } + MessageSet messageSet = new MessageSet(msgs); + // returning true upon success and false if this queue is full. + return queue.offer(messageSet); + } + + public void ack(Object msgId) { + String id = msgId.toString(); + messageRetryManager.ack(id); + } + + public void fail(Object msgId) { + String id = msgId.toString(); + messageRetryManager.fail(id); + } + + @Override + public void onStop() { + consumer.shutdown(); + } + + class MessageSender extends Thread { + @Override + public void run() { + while (ReliableRocketMQReceiver.this.isStarted()) { + MessageSet messageSet = null; + try { + messageSet = queue.take(); + } catch (InterruptedException e) { + continue; + } + if (messageSet == null) { + continue; + } + + messageRetryManager.mark(messageSet); + try { + // According to the official docs + // 'To implement a reliable receiver, you have to use store(multiple-records) to store data' + ReliableRocketMQReceiver.this.store(messageSet); + ack(messageSet.getId()); + } catch (Exception e) { + fail(messageSet.getId()); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/RocketMQReceiver.java ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/RocketMQReceiver.java b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/RocketMQReceiver.java new file mode 100644 index 0000000..3e6cb28 --- /dev/null +++ b/rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/RocketMQReceiver.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spark.streaming; + +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spark.RocketMQConfig; +import org.apache.rocketmq.spark.RocketMqUtils; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; + +import java.util.List; +import java.util.Properties; + +/** + * RocketMQReceiver uses MQPushConsumer as the default implementation. + * PushConsumer is a high level consumer API, wrapping the pulling details + * Looks like broker push messages to consumer + * + * NOTE: This is no fault-tolerance guarantees, can lose data on receiver failure. + * Recommend to use ReliableRocketMQReceiver which is fault-tolerance guarantees. + */ +public class RocketMQReceiver extends Receiver<Message> { + protected MQPushConsumer consumer; + protected boolean ordered; + protected Properties properties; + + public RocketMQReceiver(Properties properties, StorageLevel storageLevel) { + super(storageLevel); + this.properties = properties; + } + + @Override + public void onStart() { + Validate.notEmpty(properties, "Consumer properties can not be empty"); + ordered = RocketMqUtils.getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false); + + consumer = new DefaultMQPushConsumer(); + RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer); + + if (ordered) { + consumer.registerMessageListener(new MessageListenerOrderly() { + @Override + public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeOrderlyContext context) { + if (process(msgs)) { + return ConsumeOrderlyStatus.SUCCESS; + } else { + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } + }); + } else { + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + if (process(msgs)) { + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } else { + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + }); + } + + try { + consumer.start(); + } catch (MQClientException e) { + throw new RuntimeException(e); + } + } + + public boolean process(List<MessageExt> msgs) { + if (msgs.isEmpty()) { + return true; + } + try { + for (MessageExt msg : msgs) { + this.store(msg); + } + return true; + } catch (Exception e) { + return false; + } + } + + @Override + public void onStop() { + consumer.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/CachedMQConsumer.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/CachedMQConsumer.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/CachedMQConsumer.scala new file mode 100644 index 0000000..a191a5d --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/CachedMQConsumer.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import org.apache.rocketmq.client.consumer.{DefaultMQPullConsumer, PullStatus} +import org.apache.rocketmq.common.message.{MessageExt, MessageQueue} +import java.{util => ju} + +/** + * Consumer of single topic partition, intended for cached reuse. + */ + +private[rocketmq] +class CachedMQConsumer private( + val groupId: String, + val client: DefaultMQPullConsumer, + val topic: String, + val queueId: Int, + val names: Set[String], + val optionParams: ju.Map[String, String]) extends Logging { + + private val maxBatchSize = optionParams.getOrDefault(RocketMQConfig.PULL_MAX_BATCH_SIZE, "32").toInt + + private var buffer = names.map(name => name -> ju.Collections.emptyList[MessageExt].iterator).toMap + + private var nextOffsets = names.map(name => name -> -2L).toMap + + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(name: String, queueOffset: Long): MessageExt = { + + val nextOffset = nextOffsets(name) + logDebug(s"Get $groupId $topic $queueId brokerName $name nextOffset $nextOffset requested") + + if (queueOffset != nextOffset) { + logInfo(s"Initial fetch for $groupId $topic $name $queueOffset") + poll(name, queueOffset) + } + + if (!buffer(name).hasNext) { + poll(name, queueOffset) + } + + val iter = buffer(name) + if(iter.hasNext) { + val record = iter.next + assert(record.getQueueOffset == queueOffset, + s"Got wrong record for $groupId $topic $queueId $name even after seeking to offset $queueOffset") + nextOffsets += (name -> (queueOffset + 1)) + record + } else { + throw new IllegalStateException(s"Failed to get records for $groupId $topic $queueId $name $queueOffset after polling ") + } + } + + private def poll(name: String, queueOffset: Long) { + var p = client.pull(new MessageQueue(topic, name, queueId), "*", queueOffset, maxBatchSize) + var i = 0 + while (p.getPullStatus != PullStatus.FOUND){ + // it maybe not get the message, so we will retry + Thread.sleep(100) + logError(s"Polled failed for $queueId $name $queueOffset $maxBatchSize ${p.toString}") + i = i + 1 + p = client.pull(new MessageQueue(topic, name, queueId), "*", queueOffset, maxBatchSize) + if (i > 10){ + throw new IllegalStateException(s"Failed to get records for $groupId $topic $queueId $name $queueOffset after polling," + + s"due to ${p.toString}") + } + } + buffer += (name -> p.getMsgFoundList.iterator) + } +} + +object CachedMQConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, queueId: Int, names: Set[String]) + + private var groupIdToClient = Map[String, DefaultMQPullConsumer]() + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedMQConsumer] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ + def init( + initialCapacity: Int, + maxCapacity: Int, + loadFactor: Float): Unit = CachedMQConsumer.synchronized { + if (null == cache) { + logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") + cache = new ju.LinkedHashMap[CacheKey, CachedMQConsumer]( + initialCapacity, loadFactor, true) { + override def removeEldestEntry( + entry: ju.Map.Entry[CacheKey, CachedMQConsumer]): Boolean = { + if (this.size > maxCapacity) { + true + } else { + false + } + } + } + } + } + + /** + * Get a cached consumer for groupId, assigned to topic, queueId and names. + * If matching consumer doesn't already exist, will be created using optionParams. + */ + def getOrCreate( + groupId: String, + topic: String, + queueId: Int, + names: Set[String], + optionParams: ju.Map[String, String]): CachedMQConsumer = + CachedMQConsumer.synchronized { + + val client = if (!groupIdToClient.contains(groupId)){ + val client = RocketMqUtils.mkPullConsumerInstance(groupId, optionParams, s"$groupId-executor") + groupIdToClient += groupId -> client + client + } else { + groupIdToClient(groupId) + } + + val k = CacheKey(groupId, topic, queueId, names) + if (cache.containsValue(k)) { + cache.get(k) + } else { + logInfo(s"Cache miss for $k") + logDebug(cache.keySet.toString) + val c= new CachedMQConsumer(groupId, client, topic, queueId, names, optionParams) + cache.put(k, c) + c + } + } + + /** + * Get a fresh new instance, unassociated with the global cache. + * Caller is responsible for closing + */ + def getUncached( + groupId: String, + topic: String, + queueId: Int, + names: Set[String], + optionParams: ju.Map[String, String]): CachedMQConsumer = { + val client = RocketMqUtils.mkPullConsumerInstance(groupId, optionParams, + s"$groupId-executor-$queueId-${names.mkString("-")}") + new CachedMQConsumer(groupId, client, topic, queueId, names, optionParams) + } + + /** remove consumer for given groupId, topic, and queueId, if it exists */ + def remove(groupId: String, topic: String, queueId: Int, names: Set[String]): Unit = { + val k = CacheKey(groupId, topic, queueId, names) + logInfo(s"Removing $k from cache") + val v = CachedMQConsumer.synchronized { + cache.remove(k) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/ConsumerStrategy.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/ConsumerStrategy.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/ConsumerStrategy.scala new file mode 100644 index 0000000..0f1dc15 --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/ConsumerStrategy.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import java.{util => ju} + +import org.apache.rocketmq.common.UtilAll +import org.apache.rocketmq.common.message.MessageQueue +import org.apache.spark.streaming.MQPullInputDStream +import scala.collection.JavaConverters._ + +/** + * Specify the start available offset for the rocketmq consumer + */ +sealed abstract class ConsumerStrategy + +/** + * Specify the earliest available offset for the rocketmq consumer to start consuming.. + * But if the rocketmq server has checkpoint for the [[MessageQueue]], then the consumer will consume from + * the checkpoint. + */ +case object EarliestStrategy extends ConsumerStrategy + +/** + * Specify the lastest available offset for the rocketmq consumer to start consuming. + * But if the rocketmq server has checkpoint for the [[MessageQueue]], then the consumer will consume from + * the checkpoint. + */ +case object LatestStrategy extends ConsumerStrategy + +/** + * Specify the specific available offset for the rocketmq consumer to start consuming. + * Generally if the rocketmq server has checkpoint for the [[MessageQueue]], then the consumer will consume from + * the checkpoint. But if the [[MQPullInputDStream.forceSpecial]] is true, the rocketmq will start consuming from + * the specific available offset in any case. Of course, the consumer will use the min available offset if a message + * queue is not specified. + */ +case class SpecificOffsetStrategy( + queueToOffset: Map[MessageQueue, Long]) extends ConsumerStrategy + + +object ConsumerStrategy { + /** + * Used to denote offset range limits that are resolved via rocketmq + */ + val LATEST = -1L // indicates resolution to the latest offset + val EARLIEST = -2L // indicates resolution to the earliest offset + + def earliest: ConsumerStrategy = + org.apache.rocketmq.spark.EarliestStrategy + + def lastest: ConsumerStrategy = + org.apache.rocketmq.spark.LatestStrategy + + def specificOffset(queueToOffset: ju.Map[MessageQueue, Long]): ConsumerStrategy = { + val scalaMapOffset = queueToOffset.asScala.map{ case (q, o) => + (q, o) + }.toMap + SpecificOffsetStrategy(scalaMapOffset) + } + + def specificTime(queueToTime: ju.Map[MessageQueue, String]): ConsumerStrategy = { + val queueToOffset = queueToTime.asScala.map{ case (q, t) => + val offset = UtilAll.parseDate(t, UtilAll.YYYY_MMDD_HHMMSS).getTime + (q, offset) + }.toMap + SpecificOffsetStrategy(queueToOffset) + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/LocationStrategy.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/LocationStrategy.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/LocationStrategy.scala new file mode 100644 index 0000000..84554f9 --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/LocationStrategy.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import java.{util => ju} + +import scala.collection.JavaConverters._ + + +/** + * :: Experimental :: + * Choice of how to schedule consumers for a given [[TopicQueueId]] on an executor. + * See [[LocationStrategy]] to obtain instances. + * RocketMq consumers prefetch messages, so it's important for performance + * to keep cached consumers on appropriate executors, not recreate them for every partition. + * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. + */ + +sealed abstract class LocationStrategy + +case object PreferConsistent extends LocationStrategy + +case class PreferFixed(hostMap: ju.Map[TopicQueueId, String]) extends LocationStrategy + +/** + * object to obtain instances of [[LocationStrategy]] + * + */ +object LocationStrategy { + + /** + * + * Use this in most cases, it will consistently distribute partitions across all executors. + */ + def PreferConsistent: LocationStrategy = + org.apache.rocketmq.spark.PreferConsistent + + /** + * Use this to place particular TopicQueueIds on particular hosts if your load is uneven. + * Any TopicQueueId not specified in the map will use a consistent location. + */ + def PreferFixed(hostMap: collection.Map[TopicQueueId, String]): LocationStrategy = + new PreferFixed(new ju.HashMap[TopicQueueId, String](hostMap.asJava)) + + /** + * Use this to place particular TopicQueueIds on particular hosts if your load is uneven. + * Any TopicQueueId not specified in the map will use a consistent location. + */ + def PreferFixed(hostMap: ju.Map[TopicQueueId, String]): LocationStrategy = + new PreferFixed(hostMap) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/Logging.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/Logging.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/Logging.scala new file mode 100644 index 0000000..f5af5b5 --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/Logging.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import org.slf4j.{Logger, LoggerFactory} + +/** + * Utility trait for classes that want to log data. + */ +trait Logging { + // Make the log field transient so that objects with Logging can + // be serialized and used on another machine + @transient private var log_ : Logger = null + + // Method to get or create the logger + def log: Logger = { + if (log_ == null) + log_ = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) + return log_ + } + + // Log methods that take only a String + def logInfo(msg: => String) = if (log.isInfoEnabled) log.info(msg) + + def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg) + + def logWarning(msg: => String) = if (log.isWarnEnabled) log.warn(msg) + + def logError(msg: => String) = if (log.isErrorEnabled) log.error(msg) + + // Log methods that take Throwable (Exceptions/Errors) too + def logInfo(msg: => String, throwable: Throwable) { + if (log.isInfoEnabled) log.info(msg, throwable) + } + + def logDebug(msg: => String, throwable: Throwable) { + if (log.isDebugEnabled) log.debug(msg, throwable) + } + + def logWarning(msg: => String, throwable: Throwable) = + if (log.isWarnEnabled) log.warn(msg, throwable) + + def logError(msg: => String, throwable: Throwable) = + if (log.isErrorEnabled) log.error(msg, throwable) +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/OffsetRange.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/OffsetRange.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/OffsetRange.scala new file mode 100644 index 0000000..b9e6f57 --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/OffsetRange.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import java.{util => ju} + +import org.apache.rocketmq.common.message.MessageQueue + +trait HasOffsetRanges { + def offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]] +} + +trait CanCommitOffsets { + /** + * Queue up offset ranges for commit to rocketmq at a future time. Threadsafe. + * This is only needed if you intend to store offsets in rocketmq, instead of your own store. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + */ + def commitAsync(offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]]): Unit + + /** + * Queue up offset ranges for commit to rocketmq at a future time. Threadsafe. + * This is only needed if you intend to store offsets in rocketmq, instead of your own store. + * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. + * @param callback Only the most recently provided callback will be used at commit. + */ + def commitAsync(offsetRanges: ju.Map[TopicQueueId, Array[OffsetRange]], callback: OffsetCommitCallback): Unit +} + +/** + * Represents a range of offsets from a single rocketmq messageQueue. Instances of this class + * can be created with `OffsetRange.create()`. + * + * @param topic topic name + * @param queueId queueId id + * @param brokerName the broker name + * @param fromOffset Inclusive starting offset + * @param untilOffset Exclusive ending offset + */ +final class OffsetRange private( + val topic: String, + val queueId: Int, + val brokerName: String, + val fromOffset: Long, + val untilOffset: Long) extends Serializable { + import OffsetRange.OffsetRangeTuple + + /** rocketmq topicMessageQueue object, for convenience */ + def topicMessageQueue(): MessageQueue = new MessageQueue(topic, brokerName, queueId) + + /** Number of messages this OffsetRange refers to */ + + def count(): Long = { + val ret = untilOffset - fromOffset + assert(ret >= 0, s"OffsetRange happened errors form $topic $brokerName $fromOffset to $untilOffset") + ret + } + + override def equals(obj: Any): Boolean = obj match { + case that: OffsetRange => + this.topic == that.topic && + this.queueId == that.queueId && + this.brokerName == that.brokerName && + this.fromOffset == that.fromOffset && + this.untilOffset == that.untilOffset + case _ => false + } + + override def hashCode(): Int = { + toTuple.hashCode() + } + + override def toString(): String = { + s"OffsetRange(topic: '$topic', queueId: $queueId, brokerName: $brokerName, range: [$fromOffset -> $untilOffset])" + } + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + def toTuple: OffsetRangeTuple = (topic, queueId, brokerName, fromOffset, untilOffset) +} + +/** + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +object OffsetRange { + def create(topic: String, queueId: Int, brokerName: String, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, queueId, brokerName, fromOffset, untilOffset) + + def create( + topicMessageQueue: MessageQueue, + fromOffset: Long, + untilOffset: Long): OffsetRange = + new OffsetRange(topicMessageQueue.getTopic, topicMessageQueue.getQueueId, topicMessageQueue.getBrokerName, fromOffset, + untilOffset) + + def apply(topic: String, queueId: Int, brokerName: String, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, queueId, brokerName, fromOffset, untilOffset) + + def apply( + topicMessageQueue: MessageQueue, + fromOffset: Long, + untilOffset: Long): OffsetRange = + new OffsetRange(topicMessageQueue.getTopic, topicMessageQueue.getQueueId, topicMessageQueue.getBrokerName, fromOffset, + untilOffset) + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + + type OffsetRangeTuple = (String, Int, String, Long, Long) + + def apply(t: OffsetRangeTuple) = + new OffsetRange(t._1, t._2, t._3, t._4, t._5) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7ceba1b2/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqRDDPartition.scala ---------------------------------------------------------------------- diff --git a/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqRDDPartition.scala b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqRDDPartition.scala new file mode 100644 index 0000000..fc5af7c --- /dev/null +++ b/rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/RocketMqRDDPartition.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spark + +import org.apache.spark.Partition + + +/** + * the Partition for RocketMqRDD + * @param index the partition id for rdd + * @param topic the rockermq topic + * @param queueId the rocketmq queue id + * @param partitionOffsetRanges Represents a range of offsets from a single partition + * + */ +class RocketMqRDDPartition( + val index: Int, + val topic: String, + val queueId: Int, + val partitionOffsetRanges: Array[OffsetRange] + ) extends Partition { + /** Number of messages this partition refers to */ + def count(): Long = { + if (!partitionOffsetRanges.isEmpty) + partitionOffsetRanges.map(_.count).sum + else 0L + } + + + /** rocketmq TopicQueueId object, for convenience */ + def topicQueueId(): TopicQueueId = new TopicQueueId(topic, queueId) + + def brokerNames(): Set[String] = { + partitionOffsetRanges.map(_.brokerName).sorted.toSet + } + + override def toString: String = { + s"$index $topic $queueId ${partitionOffsetRanges.mkString(",")}" + } +} \ No newline at end of file
