This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ad5fc83  Kinesis-sink consider topic-name as partition-key if record 
key empty (#2372)
ad5fc83 is described below

commit ad5fc8384bc38ff79637c7bd316b41e76f7bc27a
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Aug 14 15:41:45 2018 -0700

    Kinesis-sink consider topic-name as partition-key if record key empty 
(#2372)
---
 .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 4dda58f..1056f57 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -114,7 +114,7 @@ public class KinesisSink implements Sink<byte[]> {
                     record.getRecordSequence());
             throw new IllegalStateException("kinesis queue has publish 
failure");
         }
-        String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
+        String partitionedKey = 
record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256

Reply via email to