Repository: incubator-gearpump Updated Branches: refs/heads/master b9252a38e -> 232f527d6
[GEARPUMP-303] add a RabbitMQ sink to integrate with gearpump Author: vinoyang <[email protected]> Closes #180 from yanghua/dev. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/232f527d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/232f527d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/232f527d Branch: refs/heads/master Commit: 232f527d696f81c6e1d98e794ab37dda32498811 Parents: b9252a3 Author: vinoyang <[email protected]> Authored: Mon May 8 15:20:01 2017 +0800 Committer: huafengw <[email protected]> Committed: Mon May 8 15:21:30 2017 +0800 ---------------------------------------------------------------------- experiments/rabbitmq/README.md | 40 ++++ .../experimental/rabbitmq/RMQSink.scala | 184 +++++++++++++++++++ .../rabbitmq/RabbitmqSinkSpec.scala | 49 +++++ project/BuildExperiments.scala | 15 +- project/Dependencies.scala | 1 + 5 files changed, 288 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/232f527d/experiments/rabbitmq/README.md ---------------------------------------------------------------------- diff --git a/experiments/rabbitmq/README.md b/experiments/rabbitmq/README.md new file mode 100644 index 0000000..9059608 --- /dev/null +++ b/experiments/rabbitmq/README.md @@ -0,0 +1,40 @@ +# Gearpump RabbitMQ + +Gearpump integration for [RabbitMQ](https://www.rabbitmq.com/) + +## Usage + +The message type that RMQSink is able to handle including: + + 1. String + 2. Array[Byte] + 3. Sequence of type 1 and 2 + +Suppose there is a DataSource Task will output above-mentioned messages, you can write a simple application then: + +```scala +val sink = new RMQSink(UserConfig.empty) +val sinkProcessor = DataSinkProcessor(sink, "$sinkNum") +val split = Processor[DataSource]("$splitNum") +val computation = split ~> sinkProcessor +val application = StreamApplication("RabbitMQ", Graph(computation), UserConfig.empty) +``` +## config items +to initialize the RMQSink's instance, we need a UserConfig object and should provide some config item list below : + +* [must]`rabbitmq.queue.name` : the RabbitMQ queue name we want to sink the message to; +* [optional]`rabbitmq.connection.host` : the RabbitMQ server host; +* [optional]`rabbitmq.connection.port` : the RabbitMQ server port, default port is **5672**; +* [optional]`rabbitmq.connection.uri` : the connection uri, pattern is `amqp://userName:password@hostName:portNumber/virtualHost` +* [optional]`rabbitmq.virtualhost` : the virtual-host which is a logic domain in RabbitMQ Server +* [optional]`rabbitmq.auth.username` : the user name for authorization +* [optional]`rabbitmq.auth.password` : the password for authorization +* [optional]`rabbitmq.automatic.recovery` : if need automatic recovery set `true` otherwise set `false` +* [optional]`rabbitmq.connection.timeout` : the connection's timeout +* [optional]`rabbitmq.network.recovery.internal` : recovery internal +* [optional]`rabbitmq.requested.heartbeat` : if need heartbeat set `true` otherwise set `false` +* [optional]`rabbitmq.topology.recoveryenabled` : if need recovery set `true` otherwise set `false` +* [optional]`rabbitmq.channel.max` : the maximum channel num +* [optional]`rabbitmq.frame.max` : the maximum frame num + +more details : https://www.rabbitmq.com/admin-guide.html \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/232f527d/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala ---------------------------------------------------------------------- diff --git a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala new file mode 100644 index 0000000..492fffe --- /dev/null +++ b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experimental.rabbitmq + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import com.rabbitmq.client.Channel +import com.rabbitmq.client.{Connection, ConnectionFactory} +import org.apache.gearpump.util.LogUtil + +class RMQSink(userConfig: UserConfig, + val connFactory: (UserConfig) => ConnectionFactory) extends DataSink{ + + private val LOG = LogUtil.getLogger(getClass) + var connectionFactory: ConnectionFactory = connFactory(userConfig) + var connection: Connection = null + var channel: Channel = null + var queueName: String = null + + def this(userConfig: UserConfig) = { + this(userConfig, RMQSink.getConnectionFactory) + } + + override def open(context: TaskContext): Unit = { + connection = connectionFactory.newConnection + channel = connection.createChannel + if (channel == null) { + throw new RuntimeException("None of RabbitMQ channels are available.") + } + setupQueue() + } + + override def write(message: Message): Unit = { + publish(message.msg) + } + + override def close(): Unit = { + channel.close() + connection.close() + } + + protected def setupQueue(): Unit = { + val queue = RMQSink.getQueueName(userConfig) + if (queue.isEmpty) { + throw new RuntimeException("can not get a RabbitMQ queue name") + } + + queueName = queue.get + channel.queueDeclare(queue.get, false, false, false, null) + } + + def publish(msg: Any): Unit = { + msg match { + case seq: Seq[Any] => + seq.foreach(publish) + case str: String => { + channel.basicPublish("", queueName, null, msg.asInstanceOf[String].getBytes) + } + case byteArray: Array[Byte] => { + channel.basicPublish("", queueName, null, byteArray) + } + case _ => { + LOG.warn("matched unsupported message!") + } + } + } + +} + +object RMQSink { + + val RMQSINK = "rmqsink" + val QUEUE_NAME = "rabbitmq.queue.name" + val SERVER_HOST = "rabbitmq.connection.host" + val SERVER_PORT = "rabbitmq.connection.port" + val CONNECTION_URI = "rabbitmq.connection.uri" + val VIRTUAL_HOST = "rabbitmq.virtualhost" + val AUTH_USERNAME = "rabbitmq.auth.username" + val AUTH_PASSWORD = "rabbitmq.auth.password" + val AUTOMATIC_RECOVERY = "rabbitmq.automatic.recovery" + val CONNECTION_TIMEOUT = "rabbitmq.connection.timeout" + val NETWORK_RECOVERY_INTERVAL = "rabbitmq.network.recovery.interval" + val REQUESTED_HEARTBEAT = "rabbitmq.requested.heartbeat" + val TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.topology.recoveryenabled" + val REQUESTED_CHANNEL_MAX = "rabbitmq.channel.max" + val REQUESTED_FRAME_MAX = "rabbitmq.frame.max" + + def getConnectionFactory(userConfig : UserConfig): ConnectionFactory = { + val factory : ConnectionFactory = new ConnectionFactory + + val uri : Option[String] = userConfig.getString(CONNECTION_URI) + if (uri.nonEmpty) { + factory.setUri(uri.get) + } else { + val serverHost : Option[String] = userConfig.getString(SERVER_HOST) + val serverPort : Option[Int] = userConfig.getInt(SERVER_PORT) + if (!serverHost.nonEmpty) { + throw new RuntimeException("missed config key : " + SERVER_HOST) + } + + if (!serverPort.nonEmpty) { + throw new RuntimeException("missed config key : " + SERVER_PORT) + } + + factory.setHost(serverHost.get) + factory.setPort(serverPort.get) + } + + val virtualHost : Option[String] = userConfig.getString(VIRTUAL_HOST) + if (virtualHost.nonEmpty) { + factory.setVirtualHost(virtualHost.get) + } + + val authUserName : Option[String] = userConfig.getString(AUTH_USERNAME) + if (authUserName.nonEmpty) { + factory.setUsername(authUserName.get) + } + + val authPassword : Option[String] = userConfig.getString(AUTH_PASSWORD) + if (authPassword.nonEmpty) { + factory.setPassword(authPassword.get) + } + + val automaticRecovery : Option[Boolean] = userConfig.getBoolean(AUTOMATIC_RECOVERY) + if (automaticRecovery.nonEmpty) { + factory.setAutomaticRecoveryEnabled(automaticRecovery.get) + } + + val connectionTimeOut : Option[Int] = userConfig.getInt(CONNECTION_TIMEOUT) + if (connectionTimeOut.nonEmpty) { + factory.setConnectionTimeout(connectionTimeOut.get) + } + + val networkRecoveryInterval : Option[Int] = userConfig.getInt(NETWORK_RECOVERY_INTERVAL) + if (networkRecoveryInterval.nonEmpty) { + factory.setNetworkRecoveryInterval(networkRecoveryInterval.get) + } + + val requestedHeartBeat : Option[Int] = userConfig.getInt(REQUESTED_HEARTBEAT) + if (requestedHeartBeat.nonEmpty) { + factory.setRequestedHeartbeat(requestedHeartBeat.get) + } + + val topologyRecoveryEnabled : Option[Boolean] = userConfig.getBoolean(TOPOLOGY_RECOVERY_ENABLED) + if (topologyRecoveryEnabled.nonEmpty) { + factory.setTopologyRecoveryEnabled(topologyRecoveryEnabled.get) + } + + val requestedChannelMax : Option[Int] = userConfig.getInt(REQUESTED_CHANNEL_MAX) + if (requestedChannelMax.nonEmpty) { + factory.setRequestedChannelMax(requestedChannelMax.get) + } + + val requestedFrameMax : Option[Int] = userConfig.getInt(REQUESTED_FRAME_MAX) + if (requestedFrameMax.nonEmpty) { + factory.setRequestedFrameMax(requestedFrameMax.get) + } + + factory + } + + def getQueueName(userConfig: UserConfig): Option[String] = { + userConfig.getString(QUEUE_NAME) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/232f527d/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala b/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala new file mode 100644 index 0000000..337579e --- /dev/null +++ b/experiments/rabbitmq/src/test/scala/org/apache/gearpump/experimental/rabbitmq/RabbitmqSinkSpec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experimental.rabbitmq + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.task.TaskContext +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class RabbitmqSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("RMQSink should insert a row successfully") { + + val taskContext = mock[TaskContext] + + val map = Map[String, String]("rabbitmq.queue.name" -> "test", + "rabbitmq.connection.host" -> "localhost", + "rabbitmq.connection.port" -> "5672") + val userConfig = new UserConfig(map) + + val rmqSink = new RMQSink(userConfig) + + assert(RMQSink.getQueueName(userConfig).get == "test") + +// rmqSink.open(taskContext) + +// var msg: String = "{ 'hello' : 'world' }" +// rmqSink.publish(msg) + +// rmqSink.close() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/232f527d/project/BuildExperiments.scala ---------------------------------------------------------------------- diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala index 2f491d3..92957a1 100644 --- a/project/BuildExperiments.scala +++ b/project/BuildExperiments.scala @@ -29,7 +29,8 @@ object BuildExperiments extends sbt.Build { cgroup, redis, storm, - yarn + yarn, + rabbitmq ) lazy val yarn = Project( @@ -118,4 +119,16 @@ object BuildExperiments extends sbt.Build { settings = commonSettings ++ noPublish) .dependsOn (core % "provided") .disablePlugins(sbtassembly.AssemblyPlugin) + + lazy val rabbitmq = Project( + id = "gearpump-experimentals-rabbitmq", + base = file("experiments/rabbitmq"), + settings = commonSettings ++ noPublish ++ + Seq( + libraryDependencies ++= Seq( + "com.rabbitmq" % "amqp-client" % rabbitmqVersion + ) + )) + .dependsOn(core % "provided", streaming % "test->test; provided") + .disablePlugins(sbtassembly.AssemblyPlugin) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/232f527d/project/Dependencies.scala ---------------------------------------------------------------------- diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4e30d3f..aa4e52f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -53,6 +53,7 @@ object Dependencies { val algebirdVersion = "0.9.0" val chillVersion = "0.6.0" val jedisVersion = "2.9.0" + val rabbitmqVersion = "3.5.3" val coreDependencies = Seq( libraryDependencies ++= Seq(
