Repository: bahir Updated Branches: refs/heads/master fb752570c -> 9373fa4e7
[BAHIR-182] Spark Streaming PubNub connector Implement new connector for PubNub (https://www.pubnub.com/) which is increasing in popularity as a cloud messaging infrastructure. Closes #70 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/9373fa4e Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/9373fa4e Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/9373fa4e Branch: refs/heads/master Commit: 9373fa4e7feb402f5b85367174afc5ad6b593a04 Parents: fb75257 Author: Lukasz Antoniak <[email protected]> Authored: Thu Nov 22 14:28:44 2018 -0800 Committer: Luciano Resende <[email protected]> Committed: Fri Nov 30 12:03:11 2018 +0100 ---------------------------------------------------------------------- README.md | 9 +- pom.xml | 1 + streaming-pubnub/README.md | 77 +++++++ .../streaming/pubnub/PubNubWordCount.scala | 91 +++++++++ streaming-pubnub/pom.xml | 78 +++++++ .../streaming/pubnub/PubNubInputDStream.scala | 201 +++++++++++++++++++ .../spark/streaming/pubnub/PubNubUtils.scala | 80 ++++++++ .../streaming/LocalJavaStreamingContext.java | 43 ++++ .../streaming/pubnub/JavaPubNubStreamSuite.java | 37 ++++ .../src/test/resources/log4j.properties | 33 +++ .../pubnub/MessageSerializationSuite.scala | 89 ++++++++ .../streaming/pubnub/PubNubStreamSuite.scala | 195 ++++++++++++++++++ 12 files changed, 931 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index ebbaea7..93a50ea 100644 --- a/README.md +++ b/README.md @@ -47,12 +47,15 @@ Each extension currently available in Apache Bahir has an example application lo Currently, each submodule has its own README.md, with information on example usages and API. +* [SQL Cloudant](https://github.com/apache/bahir/blob/master/sql-cloudant/README.md) +* [SQL Streaming Akka](https://github.com/apache/bahir/blob/master/sql-streaming-akka/README.md) * [SQL Streaming MQTT](https://github.com/apache/bahir/blob/master/sql-streaming-mqtt/README.md) * [Streaming Akka](https://github.com/apache/bahir/blob/master/streaming-akka/README.md) -* [Streaming Mqtt](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md) -* [Streaming Zeromq](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md) +* [Streaming MQTT](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md) +* [Streaming PubNub](https://github.com/apache/bahir/blob/master/streaming-pubnub/README.md) +* [Streaming Google Pub/Sub](https://github.com/apache/bahir/blob/master/streaming-pubsub/README.md) * [Streaming Twitter](https://github.com/apache/bahir/blob/master/streaming-twitter/README.md) -* [SQL Cloudant](sql-cloudant/README.md) +* [Streaming ZeroMQ](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md) Furthermore, to generate scaladocs for each module: http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 13407bd..ec419fa 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ <module>streaming-twitter</module> <module>streaming-zeromq</module> <module>streaming-pubsub</module> + <module>streaming-pubnub</module> </modules> <properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/README.md ---------------------------------------------------------------------- diff --git a/streaming-pubnub/README.md b/streaming-pubnub/README.md new file mode 100644 index 0000000..3b4e9d3 --- /dev/null +++ b/streaming-pubnub/README.md @@ -0,0 +1,77 @@ +# Spark Streaming PubNub Connector + +Library for reading data from real-time messaging infrastructure [PubNub](https://www.pubnub.com/) using Spark Streaming. + +## Linking + +Using SBT: + + libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubnub" % "{{site.SPARK_VERSION}}" + +Using Maven: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>spark-streaming-pubnub_{{site.SCALA_BINARY_VERSION}}</artifactId> + <version>{{site.SPARK_VERSION}}</version> + </dependency> + +This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option. +For example, to include it when starting the spark shell: + + $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubnub_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} + +Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. +The `--packages` argument can also be used with `bin/spark-submit`. + +## Examples + +Connector leverages official Java client for PubNub cloud infrastructure. You can import the `PubNubUtils` +class and create input stream by calling `PubNubUtils.createStream()` as shown below. Security and performance related +features shall be setup inside standard `PNConfiguration` object. We advise to configure reconnection policy so that +temporary network outages do not interrupt processing job. Users may subscribe to multiple channels and channel groups, +as well as specify time token to start receiving messages since given point in time. + +For complete code examples, please review _examples_ directory. + +### Scala API + + import com.pubnub.api.PNConfiguration + import com.pubnub.api.enums.PNReconnectionPolicy + + import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage} + + val config = new PNConfiguration + config.setSubscribeKey(subscribeKey) + config.setSecure(true) + config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR) + val channel = "my-channel" + + val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = PubNubUtils.createStream( + ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2 + ) + +### Java API + + import com.pubnub.api.PNConfiguration + import com.pubnub.api.enums.PNReconnectionPolicy + + import org.apache.spark.streaming.pubnub.PubNubUtils + import org.apache.spark.streaming.pubnub.SparkPubNubMessage + + PNConfiguration config = new PNConfiguration() + config.setSubscribeKey(subscribeKey) + config.setSecure(true) + config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR) + Set<String> channels = new HashSet<String>() {{ + add("my-channel"); + }}; + + ReceiverInputDStream<SparkPubNubMessage> pubNubStream = PubNubUtils.createStream( + ssc, config, channels, Collections.EMPTY_SET, null, + StorageLevel.MEMORY_AND_DISK_SER_2() + ) + +## Unit Test + +Unit tests take advantage of publicly available _demo_ subscription and and publish key, which has limited request rate. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala ---------------------------------------------------------------------- diff --git a/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala b/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala new file mode 100644 index 0000000..fe8aa1e --- /dev/null +++ b/streaming-pubnub/examples/src/main/scala/org/apache/spark/examples/streaming/pubnub/PubNubWordCount.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming.pubnub + +import com.google.gson.JsonParser +import com.pubnub.api.PNConfiguration +import com.pubnub.api.enums.PNReconnectionPolicy + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Milliseconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage} + +/** + * Consumes messages from a PubNub channel and calculates word count. + * For demo purpose, login to PubNub account and produce messages using Debug Console. + * Expected message format: {"text": "Hello, World!"} + * + * Usage: PubNubWordCount <subscribeKey> <channel> <aggregationPeriodMS> + * <subscribeKey> subscribe key + * <channel> channel + * <aggregationPeriodMS> aggregation period in milliseconds + * + * Example: + * $ bin/run-example \ + * org.apache.spark.examples.streaming.pubnub.PubNubWordCount \ + * sub-c-2d245192-ee8d-11e8-b4c3-46cd67be4fbd my-channel 60000 + */ +object PubNubWordCount { + def main(args: Array[String]): Unit = { + if (args.length != 3) { + // scalastyle:off println + System.err.println( + """ + |Usage: PubNubWordCount <subscribeKey> <channel> + | + | <subscribeKey> subscribe key + | <channel> channel + | <aggregationPeriodMS> aggregation period in milliseconds + | + """.stripMargin + ) + // scalastyle:on + System.exit(1) + } + + val Seq(subscribeKey, channel, aggregationPeriod) = args.toSeq + + val sparkConf = new SparkConf().setAppName("PubNubWordCount").setMaster("local[2]") + val ssc = new StreamingContext(sparkConf, Milliseconds(aggregationPeriod.toLong)) + + val config = new PNConfiguration + config.setSubscribeKey(subscribeKey) + config.setSecure(true) + config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR) + + val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = PubNubUtils.createStream( + ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2) + + val wordCounts = pubNubStream + .flatMap( + message => new JsonParser().parse(message.getPayload) + .getAsJsonObject.get("text").getAsString.split("\\s") + ) + .map(word => (word, 1)) + .reduceByKey(_ + _) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml new file mode 100644 index 0000000..f233961 --- /dev/null +++ b/streaming-pubnub/pom.xml @@ -0,0 +1,78 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>bahir-parent_2.11</artifactId> + <groupId>org.apache.bahir</groupId> + <version>2.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>spark-streaming-pubnub_2.11</artifactId> + <properties> + <sbt.project.name>streaming-pubnub</sbt.project.name> + </properties> + <packaging>jar</packaging> + <name>Apache Bahir - Spark Streaming PubNub</name> + <url>http://bahir.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.pubnub</groupId> + <artifactId>pubnub-gson</artifactId> + <version>4.21.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala new file mode 100644 index 0000000..794784b --- /dev/null +++ b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubInputDStream.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubnub + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +import collection.JavaConverters._ +import com.google.gson.JsonParser +import com.pubnub.api.PNConfiguration +import com.pubnub.api.PubNub +import com.pubnub.api.callbacks.SubscribeCallback +import com.pubnub.api.enums.PNReconnectionPolicy +import com.pubnub.api.models.consumer.PNStatus +import com.pubnub.api.models.consumer.pubsub.PNMessageResult +import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult + +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils + +private[streaming] +class PubNubInputDStream(_ssc: StreamingContext, + val configuration: PNConfiguration, + val channels: Seq[String], + val channelGroups: Seq[String], + val timeToken: Option[Long], + val _storageLevel: StorageLevel) + extends ReceiverInputDStream[SparkPubNubMessage](_ssc) { + override def getReceiver(): Receiver[SparkPubNubMessage] = { + new PubNubReceiver( + new SparkPubNubNConfiguration(configuration), channels, channelGroups, + timeToken, _storageLevel + ) + } +} + +/** + * Wrapper class for PNConfiguration with only consumer-related, serializable properties. + * PubNub configuration model encapsulates various fields which are not serializable. + */ +private[pubnub] +class SparkPubNubNConfiguration(configuration: PNConfiguration) extends Serializable { + var origin: String = configuration.getOrigin + var subscribeTimeout: Integer = configuration.getSubscribeTimeout + var secure: Boolean = configuration.isSecure + var subscribeKey: String = configuration.getSubscribeKey + var publishKey: String = configuration.getPublishKey + var cipherKey: String = configuration.getCipherKey + var authKey: String = configuration.getAuthKey + var uuid: String = configuration.getUuid + var connectTimeout: Integer = configuration.getConnectTimeout + var filterExpression: String = configuration.getFilterExpression + var reconnectionPolicy: PNReconnectionPolicy = configuration.getReconnectionPolicy + var maximumReconnectionRetries: Integer = configuration.getMaximumReconnectionRetries + var maximumConnections: Integer = configuration.getMaximumConnections + + def toConfiguration: PNConfiguration = { + val config = new PNConfiguration() + config.setOrigin(origin) + config.setSubscribeTimeout(subscribeTimeout) + config.setSecure(secure) + config.setSubscribeKey(subscribeKey) + config.setPublishKey(publishKey) + config.setCipherKey(cipherKey) + config.setAuthKey(authKey) + config.setUuid(uuid) + config.setConnectTimeout(connectTimeout) + config.setFilterExpression(filterExpression) + config.setReconnectionPolicy(reconnectionPolicy) + config.setMaximumReconnectionRetries(maximumReconnectionRetries) + config.setMaximumConnections(maximumConnections) + config + } +} + +/** + * Wrapper class for PNMessageResult with a custom serialization process. + * PubNub message model uses GSON objects which are not serializable. + */ +class SparkPubNubMessage extends Externalizable { + var message: PNMessageResult = _ + + // PubNub does not support sending empty messages. + def getPayload: String = message.getMessage.toString + def getChannel: String = message.getChannel + def getPublisher: String = message.getPublisher + def getSubscription: String = message.getSubscription + // Convert to Unix timestamp. + def getTimestamp: Long = Math.ceil(message.getTimetoken / 10000).longValue() + + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { + def writeVariableLength(data: Any): Unit = { + data match { + case null => out.writeInt(-1) + case d => + val buffer = Utils.serialize(d) + out.writeInt(buffer.length) + out.write(buffer) + } + } + + writeVariableLength( + if (message.getMessage != null) message.getMessage.toString else null + ) + writeVariableLength(message.getChannel) + writeVariableLength(message.getPublisher) + writeVariableLength(message.getSubscription) + + out.writeLong(message.getTimetoken) + } + + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { + def readVariableLength(): Any = { + in.readInt match { + case -1 => null + case length => + val buffer = new Array[Byte](length) + in.readFully(buffer) + Utils.deserialize(buffer) + } + } + + val parser = new JsonParser + val builder = PNMessageResult.builder + + readVariableLength() match { + case null => + case data => builder.message(parser.parse(data.asInstanceOf[String])) + } + + builder.channel(readVariableLength().asInstanceOf[String]) + builder.publisher(readVariableLength().asInstanceOf[String]) + builder.subscription(readVariableLength().asInstanceOf[String]) + builder.timetoken(in.readLong()) + + message = builder.build() + } +} + +private[pubnub] +class PubNubReceiver(configuration: SparkPubNubNConfiguration, + channels: Seq[String], + channelGroups: Seq[String], + timeToken: Option[Long], + storageLevel: StorageLevel) + extends Receiver[SparkPubNubMessage](storageLevel) with Logging { + + var client: PubNub = _ + + override def onStart(): Unit = { + client = new PubNub(configuration.toConfiguration) + client.addListener( + new SubscribeCallback() { + def status(pubNub: PubNub, status: PNStatus): Unit = { + if (status.isError) { + log.error(s"Encountered PubNub error: $status.") + } + } + + def message(pubNub: PubNub, message: PNMessageResult): Unit = { + val record = new SparkPubNubMessage + record.message = message + store(record) + } + + def presence(pubNub: PubNub, presence: PNPresenceEventResult): Unit = { + } + } + ) + val builder = client.subscribe() + .channels(channels.toList.asJava) + .channelGroups(channelGroups.toList.asJava) + if (timeToken.isDefined) { + builder.withTimetoken(timeToken.get) + } + builder.execute() + } + + override def onStop(): Unit = { + client.unsubscribeAll() + client.destroy() + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala new file mode 100644 index 0000000..4aa47eb --- /dev/null +++ b/streaming-pubnub/src/main/scala/org/apache/spark/streaming/pubnub/PubNubUtils.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubnub + +import java.util.{Set => JSet} + +import collection.JavaConverters._ +import com.pubnub.api.PNConfiguration + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.apache.spark.streaming.api.java.JavaStreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +object PubNubUtils { + /** + * Create an input stream that returns messages received from PubNub infrastructure. + * @param ssc Streaming context + * @param configuration PubNub client configuration + * @param channels Sequence of channels to subscribe + * @param channelGroups Sequence of channel groups to subscribe + * @param timeToken Optional point in time to start receiving messages from. + * Leave undefined to get only latest messages. + * @param storageLevel Storage level to use for storing the received objects + * @return Input stream + */ + def createStream( + ssc: StreamingContext, + configuration: PNConfiguration, + channels: Seq[String], + channelGroups: Seq[String], + timeToken: Option[Long] = None, + storageLevel: StorageLevel): ReceiverInputDStream[SparkPubNubMessage] = { + ssc.withNamedScope("PubNub Stream") { + new PubNubInputDStream( + ssc, configuration, channels, channelGroups, timeToken, storageLevel + ) + } + } + + /** + * Create an input stream that returns messages received from PubNub infrastructure. + * @param jssc Java streaming context + * @param configuration PubNub client configuration + * @param channels Set of channels to subscribe + * @param channelGroups Set of channel groups to subscribe + * @param timeToken Optional point in time to start receiving messages from. + * Specify <code>null</code> to get only latest messages. + * @param storageLevel Storage level to use for storing the received objects + * @return Input stream + */ + def createStream( + jssc: JavaStreamingContext, + configuration: PNConfiguration, + channels: JSet[String], + channelGroups: JSet[String], + timeToken: Option[Long], + storageLevel: StorageLevel): JavaReceiverInputDStream[SparkPubNubMessage] = { + createStream( + jssc.ssc, configuration, Seq.empty ++ channels.asScala, + Seq.empty ++ channelGroups.asScala, timeToken, storageLevel + ) + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000..448fb5e --- /dev/null +++ b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java new file mode 100644 index 0000000..507b992 --- /dev/null +++ b/streaming-pubnub/src/test/java/org/apache/spark/streaming/pubnub/JavaPubNubStreamSuite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubnub; + +import com.pubnub.api.PNConfiguration; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.junit.Test; + +import java.util.HashSet; + +public class JavaPubNubStreamSuite extends LocalJavaStreamingContext { + @Test + public void testPubNubStream() { + // Tests the API compatibility, but do not actually receive any data. + JavaReceiverInputDStream<SparkPubNubMessage> stream = PubNubUtils.createStream( + ssc, new PNConfiguration(), new HashSet<>(), new HashSet<>(), null, + StorageLevel.MEMORY_AND_DISK_SER_2() + ); + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/test/resources/log4j.properties b/streaming-pubnub/src/test/resources/log4j.properties new file mode 100644 index 0000000..bcb37d2 --- /dev/null +++ b/streaming-pubnub/src/test/resources/log4j.properties @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootCategory=INFO, console, file + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN + http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala new file mode 100644 index 0000000..9d1c3e9 --- /dev/null +++ b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/MessageSerializationSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubnub + +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.ObjectInputStream +import java.io.ObjectOutputStream + +import com.google.gson.JsonParser +import com.pubnub.api.models.consumer.pubsub.PNMessageResult + +import org.apache.spark.SparkFunSuite + +class MessageSerializationSuite extends SparkFunSuite { + test("Full example") { + checkMessageSerialization( + "{\"message\":\"Hello, World!\"}", "channel1", + "publisher1", "subscription1", System.currentTimeMillis * 10000 + ) + } + + test("Message from channel") { + checkMessageSerialization("{\"message\":\"Hello, World!\"}", "c", "p", null, 13534398158620385L) + } + + test("Message from subscription") { + checkMessageSerialization("{\"message\":\"Hello, World!\"}", null, "p", "s", 13534397812467596L) + } + + def checkMessageSerialization(payload: String, channel: String, + publisher: String, subscription: String, timestamp: Long): Unit = { + val builder = PNMessageResult.builder + .message(if (payload != null) new JsonParser().parse(payload) else null) + .channel(channel) + .publisher(publisher) + .subscription(subscription) + .timetoken(timestamp) + val pubNubMessage = builder.build() + val sparkMessage = new SparkPubNubMessage + sparkMessage.message = pubNubMessage + + // serializer + val byteOutStream = new ByteArrayOutputStream + val outputStream = new ObjectOutputStream(byteOutStream) + outputStream.writeObject(sparkMessage) + outputStream.flush() + outputStream.close() + byteOutStream.close() + val serializedBytes = byteOutStream.toByteArray + + // deserialize + val byteInStream = new ByteArrayInputStream(serializedBytes) + val inputStream = new ObjectInputStream(byteInStream) + val deserializedMessage = inputStream.readObject().asInstanceOf[SparkPubNubMessage] + inputStream.close() + byteInStream.close() + + assert(payload.equals(deserializedMessage.getPayload)) + if (channel != null) { + assert(channel.equals(deserializedMessage.getChannel)) + } else { + assert(deserializedMessage.getChannel == null) + } + if (subscription != null) { + assert(subscription.equals(deserializedMessage.getSubscription)) + } else { + assert(deserializedMessage.getSubscription == null) + } + assert(publisher.equals(deserializedMessage.getPublisher)) + val unixTimestamp = Math.ceil(timestamp / 10000).longValue() + assert(unixTimestamp.equals(deserializedMessage.getTimestamp)) + } +} http://git-wip-us.apache.org/repos/asf/bahir/blob/9373fa4e/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala new file mode 100644 index 0000000..aa461db --- /dev/null +++ b/streaming-pubnub/src/test/scala/org/apache/spark/streaming/pubnub/PubNubStreamSuite.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.pubnub + +import java.util.{Map => JMap} +import java.util.UUID + +import collection.JavaConverters._ +import com.google.gson.JsonObject +import com.pubnub.api.{PNConfiguration, PubNub, PubNubException} +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually +import org.scalatest.time +import org.scalatest.time.Span + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.Seconds +import org.apache.spark.streaming.StreamingContext + +class PubNubStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter { + val subscribeKey = "demo" + val publishKey = "demo" + val channel = "test" + + var ssc: StreamingContext = _ + var configuration: PNConfiguration = _ + var client: PubNub = _ + + override def beforeAll(): Unit = { + configuration = new PNConfiguration() + configuration.setSubscribeKey(subscribeKey) + configuration.setPublishKey(publishKey) + client = new PubNub(configuration) + } + + override def afterAll(): Unit = { + client.destroy() + } + + before { + ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1)) + } + + after { + if (ssc != null) { + ssc.stop() + } + } + + test("Stream receives messages") { + val nbOfMsg = 5 + var publishedMessages: List[JsonObject] = List() + @volatile var receivedMessages: Set[SparkPubNubMessage] = Set() + + val receiveStream = PubNubUtils.createStream( + ssc, configuration, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2 + ) + receiveStream.foreachRDD { rdd => + if (rdd.collect().length > 0) { + receivedMessages = receivedMessages ++ List(rdd.first) + receivedMessages + } + } + ssc.start() + + (1 to nbOfMsg).foreach( + _ => { + val message = new JsonObject() + message.addProperty("text", UUID.randomUUID().toString) + publishedMessages = message :: publishedMessages + } + ) + + eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) { + publishedMessages.foreach( + m => if (!receivedMessages.map(m => m.getPayload).contains(m.toString)) publishMessage(m) + ) + assert( + publishedMessages.map(m => m.toString).toSet + .subsetOf(receivedMessages.map(m => m.getPayload)) + ) + assert(channel.equals(receivedMessages.head.getChannel)) + } + } + + test("Message filtering") { + val config = new PNConfiguration() + config.setSubscribeKey(subscribeKey) + config.setPublishKey(publishKey) + config.setFilterExpression("language == 'english'") + + @volatile var receivedMessages: Set[SparkPubNubMessage] = Set() + + val receiveStream = PubNubUtils.createStream( + ssc, config, Seq(channel), Seq(), None, StorageLevel.MEMORY_AND_DISK_SER_2 + ) + receiveStream.foreachRDD { rdd => + if (rdd.collect().length > 0) { + receivedMessages = receivedMessages ++ List(rdd.first) + receivedMessages + } + } + ssc.start() + + eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) { + val polishMessage = new JsonObject() + polishMessage.addProperty("text", "dzien dobry") + publishMessage(polishMessage, Map("language" -> "polish").asJava) + + val englishMessage = new JsonObject() + englishMessage.addProperty("text", "good morning") + publishMessage(englishMessage, Map("language" -> "english").asJava) + + assert(receivedMessages.map(m => m.getPayload).size == 1) + assert(receivedMessages.head.getPayload.equals(englishMessage.toString)) + } + } + + test("Test time token") { + val config = new PNConfiguration() + config.setSubscribeKey(subscribeKey) + config.setPublishKey(publishKey) + + @volatile var receivedMessages: Set[SparkPubNubMessage] = Set() + + val currentTimeToken = client.time().sync().getTimetoken + + // Try to register subscriber with time token after we send first message. + val receiveStream = PubNubUtils.createStream( + ssc, config, Seq(channel), Seq(), Some(currentTimeToken + 5000*10000), + StorageLevel.MEMORY_AND_DISK_SER_2 + ) + receiveStream.foreachRDD { rdd => + if (rdd.collect().length > 0) { + receivedMessages = receivedMessages ++ List(rdd.first) + receivedMessages + } + } + ssc.start() + + // Give time for subscription to successfully register. + Thread.sleep(1000) + + // Make sure we publish the message. Hopefully it will not take more than 5 seconds. + // Otherwise we may see the message and test will fails. + val message = new JsonObject() + message.addProperty("text", "past") + var timeToken = -1L + while (timeToken == -1L) { + timeToken = publishMessage(message = message, store = true) + Thread.sleep(500) + } + + eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) { + val message = new JsonObject() + message.addProperty("text", "future") + publishMessage(message) + + assert(receivedMessages.map(m => m.getPayload).size == 1) + assert(receivedMessages.head.getPayload.equals(message.toString)) + } + } + + def publishMessage(message: JsonObject, + metadata: JMap[String, String] = Map.empty[String, String].asJava, + store: Boolean = false) : Long = { + try { + client.publish().channel(channel).meta(metadata) + .message(message).shouldStore(store).sync().getTimetoken + } catch { + case e: PubNubException => + if (!e.getErrormsg.contains("Account quota exceeded (2/1000000)")) { + // Ignore quota limits on demo account. We will retry. + throw new RuntimeException(e) + } + -1 + } + } +}
