This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 703305b  Kinesis-sink consider topic-name as partition-key if record 
key empty (#2372)
703305b is described below

commit 703305b5426856bab7bab30a41e4f242e7782dc7
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Aug 14 15:41:45 2018 -0700

    Kinesis-sink consider topic-name as partition-key if record key empty 
(#2372)
---
 .../kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 67de21a..cdfadde 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -114,7 +114,7 @@ public class KinesisSink implements Sink<byte[]> {
                     record.getRecordSequence());
             throw new IllegalStateException("kinesis queue has publish 
failure");
         }
-        String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
+        String partitionedKey = 
record.getKey().orElse(record.getTopicName().orElse(defaultPartitionedKey));
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256

Reply via email to