This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 357284a  Specify the RabbitMQSink type as Bytes by default. (#4007)
357284a is described below

commit 357284aabe8f868ed7810f9b5a2c702de0a07df6
Author: Fangbin Sun <[email protected]>
AuthorDate: Wed Apr 10 15:14:50 2019 +0800

    Specify the RabbitMQSink type as Bytes by default. (#4007)
---
 .../org/apache/pulsar/io/rabbitmq/RabbitMQSink.java  | 20 +++-----------------
 .../pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java    | 13 +++++++------
 2 files changed, 10 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index 43a980f..50db936 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 /**
@@ -44,7 +43,7 @@ import java.util.Map;
     configClass = RabbitMQSinkConfig.class
 )
 @Slf4j
-public class RabbitMQSink<T> implements Sink<T> {
+public class RabbitMQSink implements Sink<byte[]> {
 
     private Connection rabbitMQConnection;
     private Channel rabbitMQChannel;
@@ -76,8 +75,8 @@ public class RabbitMQSink<T> implements Sink<T> {
     }
 
     @Override
-    public void write(Record<T> record) {
-        byte[] value = toBytes(record.getValue());
+    public void write(Record<byte[]> record) {
+        byte[] value = record.getValue();
         try {
             rabbitMQChannel.basicPublish(exchangeName, routingKey, null, 
value);
             record.ack();
@@ -96,17 +95,4 @@ public class RabbitMQSink<T> implements Sink<T> {
             rabbitMQConnection.close();
         }
     }
-
-    private byte[] toBytes(Object obj) {
-        final byte[] result;
-        if (obj instanceof String) {
-            String s = (String) obj;
-            result = s.getBytes(StandardCharsets.UTF_8);
-        } else if (obj instanceof byte[]) {
-            result = (byte[]) obj;
-        } else {
-            throw new IllegalArgumentException("The value of the record must 
be String or Bytes.");
-        }
-        return result;
-    }
 }
diff --git 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
index 80675d1..80ac7da 100644
--- 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
+++ 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -26,6 +26,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -68,23 +69,23 @@ public class RabbitMQSinkTest {
         sink.open(configs, null);
 
         // write should success
-        Record<String> record = build("test-topic", "fakeKey", "fakeValue");
+        Record<byte[]> record = build("test-topic", "fakeKey", "fakeValue");
         sink.write(record);
 
         sink.close();
     }
 
-    private Record<String> build(String topic, String key, String value) {
+    private Record<byte[]> build(String topic, String key, String value) {
         // prepare a SinkRecord
-        SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+        SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
             @Override
             public Optional<String> getKey() {
                 return Optional.empty();
             }
 
             @Override
-            public String getValue() {
-                return key;
+            public byte[] getValue() {
+                return value.getBytes(StandardCharsets.UTF_8);
             }
 
             @Override
@@ -95,7 +96,7 @@ public class RabbitMQSinkTest {
                     return Optional.empty();
                 }
             }
-        }, value);
+        }, value.getBytes(StandardCharsets.UTF_8));
         return record;
     }
 }

Reply via email to