This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f62c3a  Added RabbitMQ source to Pulsar Connect (#1563)
9f62c3a is described below

commit 9f62c3a29ba4da541f338d86572f55c332208283
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Apr 12 16:23:31 2018 -0700

    Added RabbitMQ source to Pulsar Connect (#1563)
---
 pom.xml                                            |  1 +
 pulsar-connect/pom.xml                             |  1 +
 pulsar-connect/rabbitmq/pom.xml                    | 67 ++++++++++++++++
 .../pulsar/connect/rabbitmq/RabbitMQConfig.java    | 55 +++++++++++++
 .../pulsar/connect/rabbitmq/RabbitMQSource.java    | 89 ++++++++++++++++++++++
 5 files changed, 213 insertions(+)

diff --git a/pom.xml b/pom.xml
index a597861..c67b742 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,6 +144,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version>
     <aerospike-client.version>4.1.5</aerospike-client.version>
+    <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
 
     <!-- test dependencies -->
     <disruptor.version>3.4.0</disruptor.version>
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
index 38f9e63..1b955ac 100644
--- a/pulsar-connect/pom.xml
+++ b/pulsar-connect/pom.xml
@@ -36,6 +36,7 @@
     <module>twitter</module>
     <module>cassandra</module>
     <module>aerospike</module>
+    <module>rabbitmq</module>
   </modules>
 
 </project>
diff --git a/pulsar-connect/rabbitmq/pom.xml b/pulsar-connect/rabbitmq/pom.xml
new file mode 100644
index 0000000..e4277c7
--- /dev/null
+++ b/pulsar-connect/rabbitmq/pom.xml
@@ -0,0 +1,67 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-rabbitmq</artifactId>
+  <name>Pulsar Connect :: RabbitMQ</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>${rabbitmq-client.version}</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
new file mode 100644
index 0000000..1d7268e
--- /dev/null
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RabbitMQConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String connectionName;
+    private String amqUri;
+    private String queueName;
+
+    public static RabbitMQConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
+    }
+
+    public static RabbitMQConfig load(Map<String, String> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
RabbitMQConfig.class);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
new file mode 100644
index 0000000..59ce73b
--- /dev/null
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.pulsar.connect.core.PushSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * A simple connector to consume messages from a RabbitMQ queue
+ */
+public class RabbitMQSource implements PushSource<byte[]> {
+
+    private static Logger logger = 
LoggerFactory.getLogger(RabbitMQSource.class);
+
+    private Function<byte[], CompletableFuture<Void>> consumer;
+    private Connection rabbitMQConnection;
+    private Channel rabbitMQChannel;
+    private RabbitMQConfig rabbitMQConfig;
+
+    @Override
+    public void setConsumer(Function<byte[], CompletableFuture<Void>> 
consumeFunction) {
+        this.consumer = consumeFunction;
+    }
+
+    @Override
+    public void open(Map<String, String> config) throws Exception {
+        rabbitMQConfig = RabbitMQConfig.load(config);
+        if (rabbitMQConfig.getAmqUri() == null
+                || rabbitMQConfig.getQueueName() == null) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        connectionFactory.setUri(rabbitMQConfig.getAmqUri());
+        rabbitMQConnection = 
connectionFactory.newConnection(rabbitMQConfig.getConnectionName());
+        logger.info("A new connection to {}:{} has been opened successfully.",
+                rabbitMQConnection.getAddress().getCanonicalHostName(),
+                rabbitMQConnection.getPort()
+        );
+        rabbitMQChannel = rabbitMQConnection.createChannel();
+        rabbitMQChannel.queueDeclare(rabbitMQConfig.getQueueName(), false, 
false, false, null);
+        com.rabbitmq.client.Consumer consumer = new 
RabbitMQConsumer(this.consumer, rabbitMQChannel);
+        rabbitMQChannel.basicConsume(rabbitMQConfig.getQueueName(), consumer);
+        logger.info("A consumer for queue {} has been successfully started.", 
rabbitMQConfig.getQueueName());
+    }
+
+    @Override
+    public void close() throws Exception {
+        rabbitMQChannel.close();
+        rabbitMQConnection.close();
+    }
+
+    private class RabbitMQConsumer extends DefaultConsumer {
+        private Function<byte[], CompletableFuture<Void>> consumeFunction;
+
+        public RabbitMQConsumer(Function<byte[], CompletableFuture<Void>> 
consumeFunction, Channel channel) {
+            super(channel);
+            this.consumeFunction = consumeFunction;
+        }
+
+        @Override
+        public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
+            consumeFunction.apply(body);
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to