Hi, Michal,

Yeah, unfortunately, based on the current code base, the only short term
solution would be your proposal.The issue is the following:
1. Kafka producer only applies the specified partition.class to the
ProducerRecord key. Samza allows the user to specify a partition key that
is different from the ProducerRecord key, which is not supported from Kafka
producer's partition.class.
2. Kafka producer has made a backward incompatible change in the default
Partitioner from 0.8.1 to 0.8.2.

There is another JIRA ticket (SAMZA-839) opened to track this issue. I
would suggest that you join the discussion and keep watching this ticket.

Thanks!

-Yi

On Fri, Dec 18, 2015 at 7:01 AM, Michal Haris <michal.ha...@visualdna.com>
wrote:

> Ah, that's unfortunate. Basically we have an existing stream-processing
> pipeline which relies on different partitioning schemes and we are writing
> some upstream Samza jobs. The only way to get it write in a particular
> partitioning scheme is then to write a different KafkaSystemFactory right ?
> Or perhaps patch the existing one ? I don't see a reason why it has to
> always use the default partitioning..
>
> On 17 December 2015 at 07:59, Yi Pan <nickpa...@gmail.com> wrote:
>
> > Hi, Michal,
> >
> > Sorry to reply late. Actually, you are right that the "partition.class"
> > configuration is not used in Samza to determine the outgoing partition.
> In
> > Samza, partition is defined by the following code sections:
> > {code}
> >
> > val topicName = envelope.getSystemStream.getStream
> > val partitions: java.util.List[PartitionInfo]  =
> > producer.partitionsFor(topicName)
> > val partitionKey = if(envelope.getPartitionKey != null)
> > KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null
> > val record = new ProducerRecord(envelope.getSystemStream.getStream,
> >                                 partitionKey,
> >
>  envelope.getKey.asInstanceOf[Array[Byte]],
> >
> > envelope.getMessage.asInstanceOf[Array[Byte]])
> >
> > {code}
> >
> > {code}
> >
> > def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope,
> > partitions: java.util.List[PartitionInfo]): Integer = {
> >   val numPartitions = partitions.size
> >   abs(envelope.getPartitionKey.hashCode()) % numPartitions
> > }
> >
> > {code}
> >
> >
> > Hence, the partition.class in producer configuration is not used.
> >
> >
> > On Fri, Dec 11, 2015 at 4:46 AM, Michal Hariš <michal.har...@gmail.com>
> > wrote:
> >
> > > Hi all, I am not sure if this is the right mailing list but
> > > us...@samza.apache.org doesn't seem to exist.
> > >
> > > I am just looking at the code of KafkaSystemProducer and am a bit
> > confused
> > > as to how the  partitioning at Samza output is handled.
> > >
> > > Firstly it seems to be hard-coded to take a modulo of the hashCode of
> the
> > > envelope partitionKey if provided otherwise null which means that it
> > hands
> > > the partitioning decision to the underlying kafka producer.
> > >
> > > Now when I try to override `systems.<..>.producer.partitioner.class` I
> > see
> > > a warning in the initialization that partitioner.class is not a known
> > > config - however the configuartion for samza says that any
> configuration
> > > available for kafka producer can be passed to
> > `systems.<..>.producer...`. I
> > > have checked that both new and old kafka producer api have
> > > `partitioner.class` configurable.
> > >
> > > I think I am missing something or else it means that samza doesn't
> allow
> > > for custom partitioning strategies at the output to kafka.
> > >
> > > Michal,
> > >
> >
>
>
>
> --
> Michal Haris
> Technical Architect
> direct line: +44 (0) 207 749 0229
> www.visualdna.com | t: +44 (0) 207 734 7033
> 31 Old Nichol Street
> London
> E2 7HR
>

Reply via email to