This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e370b97  Added Aerospike Sink (#1545)
e370b97 is described below

commit e370b976649d761042c4b112cea4b6107ee38dd7
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Apr 10 20:45:15 2018 -0700

    Added Aerospike Sink (#1545)
---
 pom.xml                                            |   1 +
 pulsar-connect/aerospike/pom.xml                   |  67 +++++++++
 .../pulsar/connect/aerospike/AerospikeSink.java    | 151 +++++++++++++++++++++
 .../connect/aerospike/AerospikeSinkConfig.java     |  64 +++++++++
 pulsar-connect/pom.xml                             |   1 +
 5 files changed, 284 insertions(+)

diff --git a/pom.xml b/pom.xml
index 77b0fa3..a597861 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,6 +143,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <jctools.version>2.1.1</jctools.version>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version>
+    <aerospike-client.version>4.1.5</aerospike-client.version>
 
     <!-- test dependencies -->
     <disruptor.version>3.4.0</disruptor.version>
diff --git a/pulsar-connect/aerospike/pom.xml b/pulsar-connect/aerospike/pom.xml
new file mode 100644
index 0000000..5bbd111
--- /dev/null
+++ b/pulsar-connect/aerospike/pom.xml
@@ -0,0 +1,67 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-aerospike</artifactId>
+  <name>Pulsar Connect :: Aerospike</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.aerospike</groupId>
+      <artifactId>aerospike-client</artifactId>
+      <version>${aerospike-client.version}</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
new file mode 100644
index 0000000..c6a34ba
--- /dev/null
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.aerospike;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.aerospike.client.*;
+import com.aerospike.client.async.EventLoop;
+import com.aerospike.client.async.EventPolicy;
+import com.aerospike.client.async.NioEventLoops;
+import com.aerospike.client.listener.WriteListener;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.WritePolicy;
+import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Sink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple AeroSpike sink
+ */
+public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AerospikeSink.class);
+
+    // ----- Runtime fields
+    private AerospikeSinkConfig aerospikeSinkConfig;
+    private AerospikeClient client;
+    private WritePolicy writePolicy;
+    private BlockingQueue<AWriteListener> queue;
+    private EventLoop eventLoop;
+
+    @Override
+    public void open(Map<String, String> config) throws Exception {
+        aerospikeSinkConfig = AerospikeSinkConfig.load(config);
+        if (aerospikeSinkConfig.getSeedHosts() == null
+                || aerospikeSinkConfig.getKeyspace() == null
+                || aerospikeSinkConfig.getColumnName() == null) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        writePolicy = new WritePolicy();
+        writePolicy.maxRetries = aerospikeSinkConfig.getRetries();
+        writePolicy.setTimeout(aerospikeSinkConfig.getTimeoutMs());
+        createClient();
+        queue = new 
LinkedBlockingDeque<>(aerospikeSinkConfig.getMaxConcurrentRequests());
+        for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests(); 
++i) {
+            queue.put(new AWriteListener(queue));
+        }
+        eventLoop = new NioEventLoops(new EventPolicy(), 1).next();
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+        LOG.info("Connection Closed");
+    }
+
+    @Override
+    public CompletableFuture<Void> write(KeyValue<K, V> tuple) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), tuple.getKey().toString());
+        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(tuple.getValue()));
+        AWriteListener listener = null;
+        try {
+            listener = queue.take();
+        } catch (InterruptedException ex) {
+            future.completeExceptionally(ex);
+            return future;
+        }
+        listener.setFuture(future);
+        client.put(eventLoop, listener, writePolicy, key, bin);
+        return future;
+    }
+
+    private void createClient() {
+        String[] hosts = aerospikeSinkConfig.getSeedHosts().split(",");
+        if (hosts.length <= 0) {
+            throw new RuntimeException("Invalid Seed Hosts");
+        }
+        Host[] aeroSpikeHosts = new Host[hosts.length];
+        for (int i = 0; i < hosts.length; ++i) {
+            String[] hostPort = hosts[i].split(":");
+            aeroSpikeHosts[i] = new Host(hostPort[0], 
Integer.valueOf(hostPort[1]));
+        }
+        ClientPolicy policy = new ClientPolicy();
+        if (aerospikeSinkConfig.getUserName() != null && 
!aerospikeSinkConfig.getUserName().isEmpty()
+            && aerospikeSinkConfig.getPassword() != null && 
!aerospikeSinkConfig.getPassword().isEmpty()) {
+            policy.user = aerospikeSinkConfig.getUserName();
+            policy.password = aerospikeSinkConfig.getPassword();
+        }
+        client = new AerospikeClient(policy, aeroSpikeHosts);
+    }
+
+    private class AWriteListener implements WriteListener {
+        private CompletableFuture<Void> future;
+        private BlockingQueue<AWriteListener> queue;
+
+        public AWriteListener(BlockingQueue<AWriteListener> queue) {
+            this.queue = queue;
+        }
+
+        public void setFuture(CompletableFuture<Void> future) {
+            this.future = future;
+        }
+
+        @Override
+        public void onSuccess(Key key) {
+            if (future != null) {
+                future.complete(null);
+            }
+            try {
+                queue.put(this);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while being added to 
the queue" ,ex);
+            }
+        }
+
+        @Override
+        public void onFailure(AerospikeException e) {
+            if (future != null) {
+                future.completeExceptionally(e);
+            }
+            try {
+                queue.put(this);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while being added to 
the queue", ex);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
new file mode 100644
index 0000000..72f530b
--- /dev/null
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.aerospike;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class AerospikeSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String seedHosts;
+    private String keyspace;
+    private String columnName;
+
+    // Optional
+    private String userName;
+    private String password;
+    private String keySet;
+    private int maxConcurrentRequests = 100;
+    private int timeoutMs = 100;
+    private int retries = 1;
+
+
+    public static AerospikeSinkConfig load(String yamlFile) throws IOException 
{
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
+    }
+
+    public static AerospikeSinkConfig load(Map<String, String> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
AerospikeSinkConfig.class);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
index 702d0ad..38f9e63 100644
--- a/pulsar-connect/pom.xml
+++ b/pulsar-connect/pom.xml
@@ -35,6 +35,7 @@
     <module>core</module>
     <module>twitter</module>
     <module>cassandra</module>
+    <module>aerospike</module>
   </modules>
 
 </project>

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to