[
https://issues.apache.org/jira/browse/GEARPUMP-303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15998007#comment-15998007
]
ASF GitHub Bot commented on GEARPUMP-303:
-----------------------------------------
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/178#discussion_r114957864
--- Diff:
external/rabbitmq/src/main/scala/org/apache/gearpump/external/rabbitmq/RMQSink.scala
---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.external.rabbitmq
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.{Connection, ConnectionFactory}
+
+class RMQSink(userConfig: UserConfig) extends DataSink{
+
+ var connection: Connection = null
+ var channel: Channel = null
+ var queueName: String = null
+
+ override def open(context: TaskContext): Unit = {
+ val factory : ConnectionFactory =
RMQSink.getConnectionFactory(userConfig)
+ connection = factory.newConnection
+ channel = connection.createChannel
+ if (channel == null) {
+ throw new RuntimeException("None of RabbitMQ channels are
available.")
+ }
+ setupQueue()
+ }
+
+ override def write(message: Message): Unit = {
+ publish(message.msg)
+ }
+
+ override def close(): Unit = {
+ channel.close()
+ connection.close()
+ }
+
+ protected def setupQueue(): Unit = {
+ val queue = RMQSink.getQueueName(userConfig)
+ if (!queue.nonEmpty) {
+ throw new RuntimeException("can not get a RabbitMQ queue name")
+ }
+
+ queueName = queue.get
+ channel.queueDeclare(queue.get, false, false, false, null)
+ }
+
+ def publish(msg: Any): Unit = {
+ msg match {
+ case seq: Seq[Any] =>
+ seq.foreach(publish)
+ case str: String => {
+ channel.basicPublish("", queueName, null,
msg.asInstanceOf[String].getBytes)
+ }
+ case byteArray: Array[Byte@unchecked] => {
--- End diff --
is `unchecked` annotation needed here?
> add a RabbitMQ sink to integrate with gearpump
> ----------------------------------------------
>
> Key: GEARPUMP-303
> URL: https://issues.apache.org/jira/browse/GEARPUMP-303
> Project: Apache Gearpump
> Issue Type: Task
> Components: connectors
> Reporter: yanghua
> Assignee: yanghua
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)