Have you set a serde (serializer/deserializer) for the key? You need to tell 
Samza how to go from a string to bytes, and reverse. Something like this:

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
systems.kafka.samza.key.serde=string

Martin

On 15 Apr 2014, at 06:56, Tyson Norris <[email protected]> wrote:
> Hi - 
> Just following up on this, I ran into a couple other problems, as I am trying 
> to use a String for partition key. 
> 
> 
> Passing String types for either key or partitionKey generated:
> KafkaSystemProducer [WARN] Triggering a reconnect for kafka because 
> connection failed: java.lang.ClassCastException: java.lang.String cannot be 
> cast to [B
> 
> I was able to configure kafka with:
> partitioner.class=kafka.producer.DefaultPartitioner
> 
> In which case I can now use string for partitionKey and byte[] for key, which 
> worked (yay!). 
> 
> However, if I change to pass only key, it still fails with the  
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> I also tried specifying in the kafka producer.properties:
> key.serializer.class=kafka.serializer.StringEncoder
> 
> but had the same results. 
> 
> Is there something special I need to do to use String type for key when using:
> public OutgoingMessageEnvelope(SystemStream systemStream,
>                               java.lang.Object key,
>                               java.lang.Object message)
> 
> Thanks
> Tyson
> 
> On Apr 14, 2014, at 12:58 PM, Chris Riccomini <[email protected]> wrote:
> 
>> Hey Tyler,
>> 
>> Yeah, sorry this is not more clear. Physical Kafka partitions are
>> per-topic. The default partition count for a newly created topic in Kafka
>> is defined using the num.partitions setting, as you've discovered. The
>> default setting that Kafka ships with is 1. This can be overridden on a
>> per-topic basis by using the kafka-create-topic.sh tool.
>> 
>> Cheers,
>> Chris
>> 
>> On 4/14/14 12:47 PM, "Tyson Norris" <[email protected]> wrote:
>> 
>>> OK that was wrong as well (my SystemFactory is indeed supposed to use a
>>> single partition, just not the kafka system factory), but I finally got
>>> things working as expected with some kafka config changes.
>>> 
>>> For now, I set in deploy/kafka/config/server.properties:
>>> num.partitions=5
>>> 
>>> (although I assume there is a better per-topic value I should set instead
>>> of a default like this)
>>> 
>>> And now I see multiple task instances created as desired.
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote:
>>> 
>>>> OK sorry for the noise.
>>>> I stumbled upon another clue - my SystemFactory has (based on
>>>> WikipediaSystemFactory) :
>>>>  @Override
>>>>  public SystemAdmin getAdmin(String systemName, Config config) {
>>>>      return new SinglePartitionWithoutOffsetsSystemAdmin();
>>>>  }
>>>> 
>>>> Which I guess is a good reason my system is only using a single
>>>> partition. Doh.
>>>> I will work on a new SystemFactory impl to test with...
>>>> 
>>>> Thanks
>>>> Tyson
>>>> 
>>>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote:
>>>> 
>>>>> I am actually wondering if I’m missing a bit of configuration that
>>>>> indicates the number of partitions I want to create for various kafka
>>>>> topics that messages are sent to?
>>>>> 
>>>>> I don’t see where this should be added in the config, and it appears
>>>>> the partitions are not created automatically when I specify the key for
>>>>> partitioning.
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> Tyson
>>>>> 
>>>>> 
>>>>> 
>>>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris
>>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>> 
>>>>> Hi Chris -
>>>>> 
>>>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini
>>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>> 
>>>>> Hey Tyler,
>>>>> 
>>>>> """I¹m trying to have multiple tasks instances run, each processing a
>>>>> separate partition, and it appears that only a single task instance
>>>>> runs,
>>>>> processing all partitions. Or else my partitions are not created
>>>>> properly."""
>>>>> 
>>>>> Are you trying to consume a single stream that has multiple
>>>>> partitions, or
>>>>> are you processing multiple streams that all have one partition? If
>>>>> it's
>>>>> the latter, all of these messages will get routed to a single task
>>>>> instance. There is an upcoming patch to allow alternative partition
>>>>> task
>>>>> instance mappings (SAMZA-71), which Jakob Homan is working on
>>>>> currently.
>>>>> 
>>>>> No I’m trying for the former, although my SystemConsumer is set up
>>>>> like the latter.  That is, I have a system consumer that should
>>>>> generate messages in a single partition, and a task that takes all
>>>>> messages and splits them into multiple partitions.
>>>>> 
>>>>> So, in my SystemConsumer I have:
>>>>>    SystemStreamPartition systemStreamPartition = new
>>>>> SystemStreamPartition(systemName, streamId, new Partition(0));
>>>>>    try {
>>>>>        put(systemStreamPartition, new
>>>>> IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>>>>> 
>>>>> which generates messages on the same stream + partition.
>>>>> 
>>>>> Then in my first task I have:
>>>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>>> partitionKey.getBytes(), outgoingMap));
>>>>> 
>>>>> which I am trying to get routed to separate task instances based on
>>>>> partitionKey.
>>>>> 
>>>>> 
>>>>> If you have a single input stream with multiple partitions, you should
>>>>> end
>>>>> up with one task instance per partition. This partitioning model is
>>>>> explained in some detail at the 20 minute mark in this talk:
>>>>> 
>>>>> http://www.infoq.com/presentations/samza-linkedin
>>>>> 
>>>>> """the example in docs of collector.send(new
>>>>> OutgoingMessageEnvelope(new
>>>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"),
>>>>> msg.get("user_id"),
>>>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor
>>>>> has
>>>>> a key AND partitionKey - I assume the partitionKey should be
>>>>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>>>>> value for this message?"""
>>>>> 
>>>>> The OutgoingMessageEnvelope has several constructors. The two you're
>>>>> referring to are:
>>>>> 
>>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>>>>> partitionKey, Object key, Object message)
>>>>> 
>>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>>>>> Object message)
>>>>> 
>>>>> 
>>>>> This is, indeed, odd. In general, people only want the second
>>>>> constructor
>>>>> (systemStream, key, message). The constructor with the partitionKey has
>>>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>>>>> with messages in the actual log segments on the disk. This is useful
>>>>> because it means you can get access to the key information that was
>>>>> sent
>>>>> with the message. It also means that you can use log-compaction to
>>>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>>>>> cases where you might wish to partition a topic by one key (say, member
>>>>> ID), but store (or de-deuplicate by) a different key with the message.
>>>>> 
>>>>> 
>>>>> So in the case where I don’t care about deduplication, is the second
>>>>> constructor “key” parameter actually used as partition key?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> """Do I need to specify partition manager and yarn.container.count to
>>>>> get
>>>>> multiple instances of my task working to service separate
>>>>> partitions?"""
>>>>> 
>>>>> This class has been replaced by the KafkaSystemFactory and
>>>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>>>>> partitions will be handled properly by Samza. The yarn.container.count
>>>>> simply specifies how many containers (java processes) you get to run
>>>>> your
>>>>> tasks in. If you have only one TaskInstance, but specify a container
>>>>> count
>>>>> of 2, the second container won't have any partitions to process, and I
>>>>> believe the job will fail. You need to set your container count <=  the
>>>>> partition count of your input topics.
>>>>> 
>>>>> Ok, so it sounds like should be able to have multiple task instances
>>>>> in a single container, if the partitioning works.
>>>>> 
>>>>> Thanks!
>>>>> Tyson
>>>>> 
>>>>> 
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 4/11/14 12:55 PM, "Tyson Norris"
>>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>> 
>>>>> Possibly related - I cannot seem to find the source for
>>>>> KafkaPartitionManager - can someone point me to it?
>>>>> 
>>>>> Thanks
>>>>> Tyson
>>>>> 
>>>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris
>>>>> <[email protected]<mailto:[email protected]>> wrote:
>>>>> 
>>>>> Hi -
>>>>> I have a couple questions about partitioning - I¹m trying to have
>>>>> multiple tasks instances run, each processing a separate partition, and
>>>>> it appears that only a single task instance runs, processing all
>>>>> partitions. Or else my partitions are not created properly. This is
>>>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>>>> which config+code steps to take to enable partitioning of message to
>>>>> multiple instances of the same task.
>>>>> 
>>>>> To route to a partition i use: messageCollector.send(new
>>>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>>>> outgoingMap));
>>>>> - question here: the example in docs of collector.send(new
>>>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>>>> confusing, because the OutgoingMessageEnvelope constructor has a key
>>>>> AND
>>>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²)
>>>>> in
>>>>> this case, but what should key be? Just a unique value for this
>>>>> message?
>>>>> 
>>>>> I tried the 3 parameter constructor as well and have similar problems,
>>>>> where my single task instance is used regardless of partitionKey
>>>>> specified in the OutgoingMessageEnvelope.
>>>>> 
>>>>> Do I need to specify partition manager and yarn.container.count to get
>>>>> multiple instances of my task working to service separate partitions?
>>>>> 
>>>>> I¹m not sure how to tell if my messages are routed to the correct
>>>>> partition in kafka, or whether the problem is a partition handling
>>>>> config in samza.
>>>>> 
>>>>> Any advice is appreciated!
>>>>> 
>>>>> Thanks
>>>>> Tyson
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to