[ 
https://issues.apache.org/jira/browse/SAMZA-839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617909#comment-16617909
 ] 

Dong Lin commented on SAMZA-839:
--------------------------------

Hey [~kishorenc], [~michal.harish], [~voronenko-da],

My understanding of the current problem is that Samza will currently use a hash 
algorithm that is hardcoded in Samza code to hash the partitionKey when 
partitionKey != null. And this algorithm is different from the algorithm used 
by KafkaProducer.

Just a bit of history to explain why we are in this situation. The algorithm 
hardcoded in Samza is used by the old Kafka producer. A few years ago Apache 
Kafka provided a new KafkaProducer which offers a better hash algorithm. But 
that algorithm is different and backward incompatible with the old hash 
algorithm and there is no good migration path for user to migrate from old to 
new producer without messing up with the in-order delivery semantics (though 
temporarily).

As [~michal.harish] has mentioned, the API provided by Samza is confusing and 
user can not easily understand the partitionKey and key. We will definitely 
come up with a long term solution as Samza Improvement Proposal. I have got a 
proposal and I am discussing it within LinkedIn first.

In the short term, you can most likely address the issue by using constructor 
`OutgoingMessageEnvelope(SystemStream systemStream, Object partitionKey, Object 
key , Object message)` with `partitionKey=null`. In this case, the `key` will 
be passed to the KafkaProducer and the partition algorithm is determined by the 
KafkaProducer. The custom partition algorithm specified by partitioner.class in 
KafkaProducer config can also be applied properly.

Note that this may not work if you are producing to a compacted Kafka topic and 
the key is currently different from the partitionKey in 
OutgoingMessageEnvelope. It is probably a rare case. Can you confirm that the 
short term fix described above could address your use-case for now?

> KafkaSystemProducer should use the same partitioning hash function as Kafka's 
> producer
> --------------------------------------------------------------------------------------
>
>                 Key: SAMZA-839
>                 URL: https://issues.apache.org/jira/browse/SAMZA-839
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.1
>            Reporter: Kishore Nallan
>            Assignee: Dong Lin
>            Priority: Major
>
> Samza's KafkaSystemProducer class generates the partition key using:
> {{abs(envelope.getPartitionKey.hashCode()) % numPartitions}}
> However, Kafka's producer generates the partition key this way:
> {{Utils.abs(Utils.murmur2(record.key())) % numPartitions}}
> This makes it difficult for me to join 2 data sources on a common key when 
> one source is produced by Samza and the other by a default Kafka producer.
> As a work-around, I have to modify the upstream job (that uses the default 
> kafka producer) to write with an explicit partition key using Samza's hashing 
> logic. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to