I defined a customized partitioner:
class ExpertSearchCollectionPartitioner implements Partitioner<Long> {
@Override
public int partition(Long key, int numPartitions) {
System.out.print(" partition: " + key + ", " + numPartitions +
", "+ (key.hashCode() % numPartitions) +
"\n");
return (int) (key % (long) numPartitions);
}
};
and in the producer:
props.put("partitioner.class", ExpertSearchCollectionPartitioner.class
.getName());
producer = new Producer<Long, String>(new ProducerConfig(props));
producer.send(new KeyedMessage<Long, String>(
KAFKA_EXPERTSEARCHCOLLECTION_TOPIC,
new Long(1),
new String("I love kafka" + count)));
however, when I run the producer, I always got the following error:
Exception in thread "main" java.lang.NoSuchMethodException:
com.twitter.expertsearch.indexing.jit.ExpertSearchCollectionPartitioner.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2721)
at java.lang.Class.getConstructor(Class.java:1674)
at kafka.utils.Utils$.createObject(Utils.scala:457)
at kafka.producer.Producer.<init>(Producer.scala:61)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:25)
at
com.twitter.expertsearch.indexing.jit.ExpertSearchCollectionKafkaProducer.<init>(ExpertSearchCollectionKafkaProducer.java:70)
at
com.twitter.expertsearch.indexing.jit.ExpertSearchCollectionKafkaProducer.main(ExpertSearchCollectionKafkaProducer.java:91)
May I ask what I am missing? Is there anywhere an example java code to
customize the partitioner?
Thanks alot!
JAne