Hi, Michal, Yeah, unfortunately, based on the current code base, the only short term solution would be your proposal.The issue is the following: 1. Kafka producer only applies the specified partition.class to the ProducerRecord key. Samza allows the user to specify a partition key that is different from the ProducerRecord key, which is not supported from Kafka producer's partition.class. 2. Kafka producer has made a backward incompatible change in the default Partitioner from 0.8.1 to 0.8.2.
There is another JIRA ticket (SAMZA-839) opened to track this issue. I would suggest that you join the discussion and keep watching this ticket. Thanks! -Yi On Fri, Dec 18, 2015 at 7:01 AM, Michal Haris <michal.ha...@visualdna.com> wrote: > Ah, that's unfortunate. Basically we have an existing stream-processing > pipeline which relies on different partitioning schemes and we are writing > some upstream Samza jobs. The only way to get it write in a particular > partitioning scheme is then to write a different KafkaSystemFactory right ? > Or perhaps patch the existing one ? I don't see a reason why it has to > always use the default partitioning.. > > On 17 December 2015 at 07:59, Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Michal, > > > > Sorry to reply late. Actually, you are right that the "partition.class" > > configuration is not used in Samza to determine the outgoing partition. > In > > Samza, partition is defined by the following code sections: > > {code} > > > > val topicName = envelope.getSystemStream.getStream > > val partitions: java.util.List[PartitionInfo] = > > producer.partitionsFor(topicName) > > val partitionKey = if(envelope.getPartitionKey != null) > > KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null > > val record = new ProducerRecord(envelope.getSystemStream.getStream, > > partitionKey, > > > envelope.getKey.asInstanceOf[Array[Byte]], > > > > envelope.getMessage.asInstanceOf[Array[Byte]]) > > > > {code} > > > > {code} > > > > def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, > > partitions: java.util.List[PartitionInfo]): Integer = { > > val numPartitions = partitions.size > > abs(envelope.getPartitionKey.hashCode()) % numPartitions > > } > > > > {code} > > > > > > Hence, the partition.class in producer configuration is not used. > > > > > > On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <michal.har...@gmail.com> > > wrote: > > > > > Hi all, I am not sure if this is the right mailing list but > > > us...@samza.apache.org doesn't seem to exist. > > > > > > I am just looking at the code of KafkaSystemProducer and am a bit > > confused > > > as to how the partitioning at Samza output is handled. > > > > > > Firstly it seems to be hard-coded to take a modulo of the hashCode of > the > > > envelope partitionKey if provided otherwise null which means that it > > hands > > > the partitioning decision to the underlying kafka producer. > > > > > > Now when I try to override `systems.<..>.producer.partitioner.class` I > > see > > > a warning in the initialization that partitioner.class is not a known > > > config - however the configuartion for samza says that any > configuration > > > available for kafka producer can be passed to > > `systems.<..>.producer...`. I > > > have checked that both new and old kafka producer api have > > > `partitioner.class` configurable. > > > > > > I think I am missing something or else it means that samza doesn't > allow > > > for custom partitioning strategies at the output to kafka. > > > > > > Michal, > > > > > > > > > -- > Michal Haris > Technical Architect > direct line: +44 (0) 207 749 0229 > www.visualdna.com | t: +44 (0) 207 734 7033 > 31 Old Nichol Street > London > E2 7HR >