This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ad5fc83 Kinesis-sink consider topic-name as partition-key if record
key empty (#2372)
ad5fc83 is described below
commit ad5fc8384bc38ff79637c7bd316b41e76f7bc27a
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Aug 14 15:41:45 2018 -0700
Kinesis-sink consider topic-name as partition-key if record key empty
(#2372)
---
.../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 4dda58f..1056f57 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -114,7 +114,7 @@ public class KinesisSink implements Sink<byte[]> {
record.getRecordSequence());
throw new IllegalStateException("kinesis queue has publish
failure");
}
- String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
+ String partitionedKey =
record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least
one, and at most 256