This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 357284a Specify the RabbitMQSink type as Bytes by default. (#4007)
357284a is described below
commit 357284aabe8f868ed7810f9b5a2c702de0a07df6
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Apr 10 15:14:50 2019 +0800
Specify the RabbitMQSink type as Bytes by default. (#4007)
---
.../org/apache/pulsar/io/rabbitmq/RabbitMQSink.java | 20 +++-----------------
.../pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java | 13 +++++++------
2 files changed, 10 insertions(+), 23 deletions(-)
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index 43a980f..50db936 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
@@ -44,7 +43,7 @@ import java.util.Map;
configClass = RabbitMQSinkConfig.class
)
@Slf4j
-public class RabbitMQSink<T> implements Sink<T> {
+public class RabbitMQSink implements Sink<byte[]> {
private Connection rabbitMQConnection;
private Channel rabbitMQChannel;
@@ -76,8 +75,8 @@ public class RabbitMQSink<T> implements Sink<T> {
}
@Override
- public void write(Record<T> record) {
- byte[] value = toBytes(record.getValue());
+ public void write(Record<byte[]> record) {
+ byte[] value = record.getValue();
try {
rabbitMQChannel.basicPublish(exchangeName, routingKey, null,
value);
record.ack();
@@ -96,17 +95,4 @@ public class RabbitMQSink<T> implements Sink<T> {
rabbitMQConnection.close();
}
}
-
- private byte[] toBytes(Object obj) {
- final byte[] result;
- if (obj instanceof String) {
- String s = (String) obj;
- result = s.getBytes(StandardCharsets.UTF_8);
- } else if (obj instanceof byte[]) {
- result = (byte[]) obj;
- } else {
- throw new IllegalArgumentException("The value of the record must
be String or Bytes.");
- }
- return result;
- }
}
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
index 80675d1..80ac7da 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -68,23 +69,23 @@ public class RabbitMQSinkTest {
sink.open(configs, null);
// write should success
- Record<String> record = build("test-topic", "fakeKey", "fakeValue");
+ Record<byte[]> record = build("test-topic", "fakeKey", "fakeValue");
sink.write(record);
sink.close();
}
- private Record<String> build(String topic, String key, String value) {
+ private Record<byte[]> build(String topic, String key, String value) {
// prepare a SinkRecord
- SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+ SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
@Override
public Optional<String> getKey() {
return Optional.empty();
}
@Override
- public String getValue() {
- return key;
+ public byte[] getValue() {
+ return value.getBytes(StandardCharsets.UTF_8);
}
@Override
@@ -95,7 +96,7 @@ public class RabbitMQSinkTest {
return Optional.empty();
}
}
- }, value);
+ }, value.getBytes(StandardCharsets.UTF_8));
return record;
}
}