This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
Advertising
The following commit(s) were added to refs/heads/master by this push:
new e370b97 Added Aerospike Sink (#1545)
e370b97 is described below
commit e370b976649d761042c4b112cea4b6107ee38dd7
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Apr 10 20:45:15 2018 -0700
Added Aerospike Sink (#1545)
---
pom.xml | 1 +
pulsar-connect/aerospike/pom.xml | 67 +++++++++
.../pulsar/connect/aerospike/AerospikeSink.java | 151 +++++++++++++++++++++
.../connect/aerospike/AerospikeSinkConfig.java | 64 +++++++++
pulsar-connect/pom.xml | 1 +
5 files changed, 284 insertions(+)
diff --git a/pom.xml b/pom.xml
index 77b0fa3..a597861 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,6 +143,7 @@ flexible messaging model and an intuitive client
API.</description>
<jctools.version>2.1.1</jctools.version>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra-driver-core.version>3.4.0</cassandra-driver-core.version>
+ <aerospike-client.version>4.1.5</aerospike-client.version>
<!-- test dependencies -->
<disruptor.version>3.4.0</disruptor.version>
diff --git a/pulsar-connect/aerospike/pom.xml b/pulsar-connect/aerospike/pom.xml
new file mode 100644
index 0000000..5bbd111
--- /dev/null
+++ b/pulsar-connect/aerospike/pom.xml
@@ -0,0 +1,67 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-connect</artifactId>
+ <version>2.0.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-connect-aerospike</artifactId>
+ <name>Pulsar Connect :: Aerospike</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-connect-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aerospike</groupId>
+ <artifactId>aerospike-client</artifactId>
+ <version>${aerospike-client.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
new file mode 100644
index 0000000..c6a34ba
--- /dev/null
+++
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.aerospike;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.aerospike.client.*;
+import com.aerospike.client.async.EventLoop;
+import com.aerospike.client.async.EventPolicy;
+import com.aerospike.client.async.NioEventLoops;
+import com.aerospike.client.listener.WriteListener;
+import com.aerospike.client.policy.ClientPolicy;
+import com.aerospike.client.policy.WritePolicy;
+import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Sink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple AeroSpike sink
+ */
+public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AerospikeSink.class);
+
+ // ----- Runtime fields
+ private AerospikeSinkConfig aerospikeSinkConfig;
+ private AerospikeClient client;
+ private WritePolicy writePolicy;
+ private BlockingQueue<AWriteListener> queue;
+ private EventLoop eventLoop;
+
+ @Override
+ public void open(Map<String, String> config) throws Exception {
+ aerospikeSinkConfig = AerospikeSinkConfig.load(config);
+ if (aerospikeSinkConfig.getSeedHosts() == null
+ || aerospikeSinkConfig.getKeyspace() == null
+ || aerospikeSinkConfig.getColumnName() == null) {
+ throw new IllegalArgumentException("Required property not set.");
+ }
+
+ writePolicy = new WritePolicy();
+ writePolicy.maxRetries = aerospikeSinkConfig.getRetries();
+ writePolicy.setTimeout(aerospikeSinkConfig.getTimeoutMs());
+ createClient();
+ queue = new
LinkedBlockingDeque<>(aerospikeSinkConfig.getMaxConcurrentRequests());
+ for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests();
++i) {
+ queue.put(new AWriteListener(queue));
+ }
+ eventLoop = new NioEventLoops(new EventPolicy(), 1).next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ LOG.info("Connection Closed");
+ }
+
+ @Override
+ public CompletableFuture<Void> write(KeyValue<K, V> tuple) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ Key key = new Key(aerospikeSinkConfig.getKeyspace(),
aerospikeSinkConfig.getKeySet(), tuple.getKey().toString());
+ Bin bin = new Bin(aerospikeSinkConfig.getColumnName(),
Value.getAsBlob(tuple.getValue()));
+ AWriteListener listener = null;
+ try {
+ listener = queue.take();
+ } catch (InterruptedException ex) {
+ future.completeExceptionally(ex);
+ return future;
+ }
+ listener.setFuture(future);
+ client.put(eventLoop, listener, writePolicy, key, bin);
+ return future;
+ }
+
+ private void createClient() {
+ String[] hosts = aerospikeSinkConfig.getSeedHosts().split(",");
+ if (hosts.length <= 0) {
+ throw new RuntimeException("Invalid Seed Hosts");
+ }
+ Host[] aeroSpikeHosts = new Host[hosts.length];
+ for (int i = 0; i < hosts.length; ++i) {
+ String[] hostPort = hosts[i].split(":");
+ aeroSpikeHosts[i] = new Host(hostPort[0],
Integer.valueOf(hostPort[1]));
+ }
+ ClientPolicy policy = new ClientPolicy();
+ if (aerospikeSinkConfig.getUserName() != null &&
!aerospikeSinkConfig.getUserName().isEmpty()
+ && aerospikeSinkConfig.getPassword() != null &&
!aerospikeSinkConfig.getPassword().isEmpty()) {
+ policy.user = aerospikeSinkConfig.getUserName();
+ policy.password = aerospikeSinkConfig.getPassword();
+ }
+ client = new AerospikeClient(policy, aeroSpikeHosts);
+ }
+
+ private class AWriteListener implements WriteListener {
+ private CompletableFuture<Void> future;
+ private BlockingQueue<AWriteListener> queue;
+
+ public AWriteListener(BlockingQueue<AWriteListener> queue) {
+ this.queue = queue;
+ }
+
+ public void setFuture(CompletableFuture<Void> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void onSuccess(Key key) {
+ if (future != null) {
+ future.complete(null);
+ }
+ try {
+ queue.put(this);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Interrupted while being added to
the queue" ,ex);
+ }
+ }
+
+ @Override
+ public void onFailure(AerospikeException e) {
+ if (future != null) {
+ future.completeExceptionally(e);
+ }
+ try {
+ queue.put(this);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Interrupted while being added to
the queue", ex);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
new file mode 100644
index 0000000..72f530b
--- /dev/null
+++
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.aerospike;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class AerospikeSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String seedHosts;
+ private String keyspace;
+ private String columnName;
+
+ // Optional
+ private String userName;
+ private String password;
+ private String keySet;
+ private int maxConcurrentRequests = 100;
+ private int timeoutMs = 100;
+ private int retries = 1;
+
+
+ public static AerospikeSinkConfig load(String yamlFile) throws IOException
{
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
+ }
+
+ public static AerospikeSinkConfig load(Map<String, String> map) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map),
AerospikeSinkConfig.class);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
index 702d0ad..38f9e63 100644
--- a/pulsar-connect/pom.xml
+++ b/pulsar-connect/pom.xml
@@ -35,6 +35,7 @@
<module>core</module>
<module>twitter</module>
<module>cassandra</module>
+ <module>aerospike</module>
</modules>
</project>
--
To stop receiving notification emails like this one, please contact
si...@apache.org.