This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7903fa  [Issue 3966] [pulsar-io] Specify the RedisSink type as Bytes 
by default (#3972)
b7903fa is described below

commit b7903fae328b9b16318675eb2f2cb300ac6cadb1
Author: Fangbin Sun <[email protected]>
AuthorDate: Thu Apr 4 23:51:33 2019 +0800

    [Issue 3966] [pulsar-io] Specify the RedisSink type as Bytes by default 
(#3972)
    
    * Specify the RedisSink type as Bytes by default.
    
    * Some minor fix
---
 .../org/apache/pulsar/io/redis/sink/RedisSink.java | 29 +++------
 .../apache/pulsar/io/redis/sink/RedisSinkTest.java | 72 +++++++++-------------
 2 files changed, 36 insertions(+), 65 deletions(-)

diff --git 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index d27448b..095f7a5 100644
--- 
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++ 
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
     configClass = RedisSinkConfig.class
 )
 @Slf4j
-public class RedisSink<T> implements Sink<T> {
+public class RedisSink implements Sink<byte[]> {
 
     private RedisSinkConfig redisSinkConfig;
 
@@ -61,7 +61,7 @@ public class RedisSink<T> implements Sink<T> {
 
     private int batchSize;
 
-    private List<Record<T>> incomingList;
+    private List<Record<byte[]>> incomingList;
 
     private ScheduledExecutorService flushExecutor;
 
@@ -84,7 +84,7 @@ public class RedisSink<T> implements Sink<T> {
     }
 
     @Override
-    public void write(Record<T> record) throws Exception {
+    public void write(Record<byte[]> record) throws Exception {
         int currentSize;
         synchronized (this) {
             incomingList.add(record);
@@ -108,7 +108,7 @@ public class RedisSink<T> implements Sink<T> {
 
     private void flush() {
         final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
-        final List<Record<T>> recordsToFlush;
+        final List<Record<byte[]>> recordsToFlush;
 
         synchronized (this) {
             if (incomingList.isEmpty()) {
@@ -119,11 +119,11 @@ public class RedisSink<T> implements Sink<T> {
         }
 
         if (CollectionUtils.isNotEmpty(recordsToFlush)) {
-            for (Record<T> record: recordsToFlush) {
+            for (Record<byte[]> record: recordsToFlush) {
                 try {
                     // records with null keys or values will be ignored
-                    byte[] key = toBytes("key", record.getKey().orElse(null));
-                    byte[] value = toBytes("value", record.getValue());
+                    byte[] key = record.getKey().isPresent() ? 
record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
+                    byte[] value = record.getValue();
                     recordsToSet.put(key, value);
                 } catch (Exception e) {
                     record.fail();
@@ -155,19 +155,4 @@ public class RedisSink<T> implements Sink<T> {
             log.error("Redis mset data interrupted exception ", e);
         }
     }
-
-    private byte[] toBytes(String src, Object obj) {
-        final byte[] result;
-        if (obj instanceof String) {
-            String s = (String) obj;
-            result = s.getBytes(StandardCharsets.UTF_8);
-        } else if (obj instanceof byte[]) {
-            result = (byte[]) obj;
-        } else if (null == obj) {
-            result = null;
-        } else {
-            throw new IllegalArgumentException(String.format("The %s for the 
record must be String or Bytes.", src));
-        }
-        return result;
-    }
 }
diff --git 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 455ef89..ce6595d 100644
--- 
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++ 
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -18,27 +18,18 @@
  */
 package org.apache.pulsar.io.redis.sink;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * redis Sink test
@@ -48,17 +39,6 @@ public class RedisSinkTest {
 
     private EmbeddedRedisUtils embeddedRedisUtils;
 
-    /**
-     * A Simple class to test redis class
-     */
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    public static class Foo {
-        private String field1;
-        private String field2;
-    }
-
     @BeforeMethod
     public void setUp() throws Exception {
         embeddedRedisUtils = new 
EmbeddedRedisUtils(getClass().getSimpleName());
@@ -83,26 +63,7 @@ public class RedisSinkTest {
         RedisSink sink = new RedisSink();
 
         // prepare a foo Record
-        Foo obj = new Foo();
-        obj.setField1("FakeFiled1");
-        obj.setField2("FakeFiled1");
-        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-
-        byte[] bytes = schema.encode(obj);
-        ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
-        
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
-
-        Message<GenericRecord> message = new MessageImpl("fake_topic_name", 
"77:777", configs, payload, autoConsumeSchema);
-        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
-            .message(message)
-            .topicName("fake_topic_name")
-            .build();
-
-        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
-            obj.toString(),
-            message.getValue().toString(),
-            record.getValue().toString());
+        Record<byte[]> record = build("fakeTopic", "fakeKey", "fakeValue");
 
         // open should success
         sink.open(configs, null);
@@ -115,4 +76,29 @@ public class RedisSinkTest {
         Thread.sleep(1000);
 
     }
+
+    private Record<byte[]> build(String topic, String key, String value) {
+        // prepare a SinkRecord
+        SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
+            @Override
+            public Optional<String> getKey() {
+                return Optional.empty();
+            }
+
+            @Override
+            public byte[] getValue() {
+                return value.getBytes(StandardCharsets.UTF_8);
+            }
+
+            @Override
+            public Optional<String> getDestinationTopic() {
+                if (topic != null) {
+                    return Optional.of(topic);
+                } else {
+                    return Optional.empty();
+                }
+            }
+        }, value.getBytes(StandardCharsets.UTF_8));
+        return record;
+    }
 }

Reply via email to