Hi Stuart, Thanks for the tip. I've come across this configuration as well. It's not so much an issue that I want to change Kafka's partitioner, rather that I want to ensure that Samza is using Kafka's default partitioner (although your point about explicitly defining the same partitioner in every client is a good one). From what I'm seeing in the Samza source, if an OutgoingMessageEnvelope is instantiated with the most verbose constructor and the partitionKey is set explicitly to null, Samza's partitioner should be bypassed and the default Kafka partitioner should be used with the key[0]. At that point, the key will be passed into Kafka's partitioner, which then should use the default murmur2 partitioner (Samza having not provided an explicit partition)[1][2][3][4]. Does that seem right?
Sorry about all of the Github links, just trying to be specific about my reasoning! Cheers, Malcolm McFarland Cavulus [0] https://github.com/apache/samza/blob/1.7.0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L97-L101 [1] https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L895 [2] https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java#L44-L47 [3] https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1268-L1271 [4] https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L52-L60 On Wed, Dec 14, 2022 at 11:45 PM Stuart Perks <stuart.m.pe...@gmail.com> wrote: > Hi Malcolm, > > You should be able to override the following producer config for > *partitioner.class*: > https://kafka.apache.org/24/documentation.html#producerconfigs > > This can be done as follows via Samza config systems.*system-name*. > producer.* : > https://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#kafka > > Caveat I haven’t tried this but should work from docs. > > But I would say ideally to be safe you should rekey/repartition on your > consumer to protect against future producers that differ or producers > accidentally changing the partitioner. > > Hope it helps, > > Stuart > > > > > On 15 Dec 2022, at 04:01, Malcolm McFarland <mmcfarl...@cavulus.com> > wrote: > > Hey folks, > > I'm working on a system where several different Kafka clients (including > Samza) are producing into the same Kafka topic. It's necessary for each of > these clients to calculate the same partition hash for the same key input > to ensure consistent message ordering (there are some asynchronous actions > that need to be ordered across systems). I've been able to get our non-JVM > Kafka clients to calculate partition identifiers (using the murmur2 hashing > algorithm) in the same manner as the official Java Kafka producers. > However, it looks like Samza uses its own hashing algorithm[0]; this is > fine for maintaining order if it's just Samza producing into a topic, but > it's not so great if Samza is just one system of many that are working on a > multi-stage task. > > I've dug through the Samza and Kafka codebases quite a bit over the last > few days, and I'm at a loss about how to get Samza to hash partition > indexes in a way that's compatible with other producers. I've tried > implementing Samza's hashing algorithm in other clients (ie with [1]), but > cannot for the life of me get equivalent partition calculations in a > non-JVM language. > > Does anybody know a) if it's possible to define a custom key-to-partition > hashing algorithm in Samza, or b) if there is a reliable general-purpose > algorithm that can create the same results as Samza's algorithm? > > Cheers, > Malcolm McFarland > Cavulus > > [0] > > https://github.com/apache/samza/blob/1.7.0/samza-kafka/src/main/java/org/apache/samza/util/KafkaUtil.java#L47-L49 > [1] > > https://stackoverflow.com/questions/40303333/how-to-replicate-java-hashcode-in-c-language > > >