This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b7903fa [Issue 3966] [pulsar-io] Specify the RedisSink type as Bytes
by default (#3972)
b7903fa is described below
commit b7903fae328b9b16318675eb2f2cb300ac6cadb1
Author: Fangbin Sun <[email protected]>
AuthorDate: Thu Apr 4 23:51:33 2019 +0800
[Issue 3966] [pulsar-io] Specify the RedisSink type as Bytes by default
(#3972)
* Specify the RedisSink type as Bytes by default.
* Some minor fix
---
.../org/apache/pulsar/io/redis/sink/RedisSink.java | 29 +++------
.../apache/pulsar/io/redis/sink/RedisSinkTest.java | 72 +++++++++-------------
2 files changed, 36 insertions(+), 65 deletions(-)
diff --git
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index d27448b..095f7a5 100644
---
a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++
b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
configClass = RedisSinkConfig.class
)
@Slf4j
-public class RedisSink<T> implements Sink<T> {
+public class RedisSink implements Sink<byte[]> {
private RedisSinkConfig redisSinkConfig;
@@ -61,7 +61,7 @@ public class RedisSink<T> implements Sink<T> {
private int batchSize;
- private List<Record<T>> incomingList;
+ private List<Record<byte[]>> incomingList;
private ScheduledExecutorService flushExecutor;
@@ -84,7 +84,7 @@ public class RedisSink<T> implements Sink<T> {
}
@Override
- public void write(Record<T> record) throws Exception {
+ public void write(Record<byte[]> record) throws Exception {
int currentSize;
synchronized (this) {
incomingList.add(record);
@@ -108,7 +108,7 @@ public class RedisSink<T> implements Sink<T> {
private void flush() {
final Map<byte[], byte[]> recordsToSet = new ConcurrentHashMap<>();
- final List<Record<T>> recordsToFlush;
+ final List<Record<byte[]>> recordsToFlush;
synchronized (this) {
if (incomingList.isEmpty()) {
@@ -119,11 +119,11 @@ public class RedisSink<T> implements Sink<T> {
}
if (CollectionUtils.isNotEmpty(recordsToFlush)) {
- for (Record<T> record: recordsToFlush) {
+ for (Record<byte[]> record: recordsToFlush) {
try {
// records with null keys or values will be ignored
- byte[] key = toBytes("key", record.getKey().orElse(null));
- byte[] value = toBytes("value", record.getValue());
+ byte[] key = record.getKey().isPresent() ?
record.getKey().get().getBytes(StandardCharsets.UTF_8) : null;
+ byte[] value = record.getValue();
recordsToSet.put(key, value);
} catch (Exception e) {
record.fail();
@@ -155,19 +155,4 @@ public class RedisSink<T> implements Sink<T> {
log.error("Redis mset data interrupted exception ", e);
}
}
-
- private byte[] toBytes(String src, Object obj) {
- final byte[] result;
- if (obj instanceof String) {
- String s = (String) obj;
- result = s.getBytes(StandardCharsets.UTF_8);
- } else if (obj instanceof byte[]) {
- result = (byte[]) obj;
- } else if (null == obj) {
- result = null;
- } else {
- throw new IllegalArgumentException(String.format("The %s for the
record must be String or Bytes.", src));
- }
- return result;
- }
}
diff --git
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
index 455ef89..ce6595d 100644
---
a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
+++
b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java
@@ -18,27 +18,18 @@
*/
package org.apache.pulsar.io.redis.sink;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.io.redis.EmbeddedRedisUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
/**
* redis Sink test
@@ -48,17 +39,6 @@ public class RedisSinkTest {
private EmbeddedRedisUtils embeddedRedisUtils;
- /**
- * A Simple class to test redis class
- */
- @Data
- @ToString
- @EqualsAndHashCode
- public static class Foo {
- private String field1;
- private String field2;
- }
-
@BeforeMethod
public void setUp() throws Exception {
embeddedRedisUtils = new
EmbeddedRedisUtils(getClass().getSimpleName());
@@ -83,26 +63,7 @@ public class RedisSinkTest {
RedisSink sink = new RedisSink();
// prepare a foo Record
- Foo obj = new Foo();
- obj.setField1("FakeFiled1");
- obj.setField2("FakeFiled1");
- AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-
- byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
- AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
-
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
-
- Message<GenericRecord> message = new MessageImpl("fake_topic_name",
"77:777", configs, payload, autoConsumeSchema);
- Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
- .message(message)
- .topicName("fake_topic_name")
- .build();
-
- log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
- obj.toString(),
- message.getValue().toString(),
- record.getValue().toString());
+ Record<byte[]> record = build("fakeTopic", "fakeKey", "fakeValue");
// open should success
sink.open(configs, null);
@@ -115,4 +76,29 @@ public class RedisSinkTest {
Thread.sleep(1000);
}
+
+ private Record<byte[]> build(String topic, String key, String value) {
+ // prepare a SinkRecord
+ SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
+ @Override
+ public Optional<String> getKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public Optional<String> getDestinationTopic() {
+ if (topic != null) {
+ return Optional.of(topic);
+ } else {
+ return Optional.empty();
+ }
+ }
+ }, value.getBytes(StandardCharsets.UTF_8));
+ return record;
+ }
}