sijie closed pull request #1814: Make the current reference sinks usable
URL: https://github.com/apache/incubator-pulsar/pull/1814
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
index f1390c12d5..18e851944b 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
@@ -33,7 +33,6 @@
import com.aerospike.client.policy.WritePolicy;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +42,10 @@
import java.util.concurrent.LinkedBlockingDeque;
/**
- * Simple AeroSpike sink
+ * A Simple abstract class for Aerospike sink
+ * Users need to implement extractKeyValue function to use this sink
*/
-public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class AerospikeSink<K, V> extends SimpleSink<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(AerospikeSink.class);
@@ -83,10 +83,11 @@ public void close() throws Exception {
}
@Override
- public CompletableFuture<Void> write(KeyValue<K, V> record) {
+ public CompletableFuture<Void> write(byte[] record) {
CompletableFuture<Void> future = new CompletableFuture<>();
- Key key = new Key(aerospikeSinkConfig.getKeyspace(),
aerospikeSinkConfig.getKeySet(), record.getKey().toString());
- Bin bin = new Bin(aerospikeSinkConfig.getColumnName(),
Value.getAsBlob(record.getValue()));
+ KeyValue<K, V> keyValue = extractKeyValue(record);
+ Key key = new Key(aerospikeSinkConfig.getKeyspace(),
aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
+ Bin bin = new Bin(aerospikeSinkConfig.getColumnName(),
Value.getAsBlob(keyValue.getValue()));
AWriteListener listener = null;
try {
listener = queue.take();
@@ -154,4 +155,6 @@ public void onFailure(AerospikeException e) {
}
}
}
+
+ public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
\ No newline at end of file
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
index 9aa09e92a8..710feb7a60 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
@@ -29,7 +29,6 @@
import com.google.common.util.concurrent.Futures;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,10 +36,10 @@
import java.util.concurrent.CompletableFuture;
/**
- * Simple Cassandra sink
- * Takes in a KeyValue and writes it to a predefined
keyspace/columnfamily/columnname.
+ * A Simple abstract class for Cassandra sink
+ * Users need to implement extractKeyValue function to use this sink
*/
-public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class CassandraSink<K, V> extends SimpleSink<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(CassandraSink.class);
@@ -72,8 +71,9 @@ public void close() throws Exception {
}
@Override
- public CompletableFuture<Void> write(KeyValue<K, V> record) {
- BoundStatement bound = statement.bind(record.getKey(),
record.getValue());
+ public CompletableFuture<Void> write(byte[] record) {
+ KeyValue<K, V> keyValue = extractKeyValue(record);
+ BoundStatement bound = statement.bind(keyValue.getKey(),
keyValue.getValue());
ResultSetFuture future = session.executeAsync(bound);
CompletableFuture<Void> completable = new CompletableFuture<Void>();
Futures.addCallback(future,
@@ -108,4 +108,6 @@ private void createClient(String roots) {
session = cluster.connect();
session.execute("USE " + cassandraSinkConfig.getKeyspace());
}
+
+ public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
index 08ca65274a..5aa9894160 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
@@ -25,7 +25,6 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +36,10 @@
import java.util.concurrent.Future;
/**
- * Simple Kafka Sink to publish messages to a Kafka topic
+ * A Simple abstract class for Kafka sink
+ * Users need to implement extractKeyValue function to use this sink
*/
-public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class KafkaSink<K, V> extends SimpleSink<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
@@ -48,8 +48,9 @@
private KafkaSinkConfig kafkaSinkConfig;
@Override
- public CompletableFuture<Void> write(KeyValue<K, V> message) {
- ProducerRecord<K, V> record = new
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(),
message.getValue());
+ public CompletableFuture<Void> write(byte[] message) {
+ KeyValue<K, V> keyValue = extractKeyValue(message);
+ ProducerRecord<K, V> record = new
ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(),
keyValue.getValue());
LOG.debug("Record sending to kafka, record={}.", record);
Future f = producer.send(record);
return CompletableFuture.supplyAsync(() -> {
@@ -91,4 +92,6 @@ public void open(Map<String, Object> config) throws Exception
{
LOG.info("Kafka sink started.");
}
+
+ public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services