Have you set a serde (serializer/deserializer) for the key? You need to tell Samza how to go from a string to bytes, and reverse. Something like this:
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory systems.kafka.samza.key.serde=string Martin On 15 Apr 2014, at 06:56, Tyson Norris <[email protected]> wrote: > Hi - > Just following up on this, I ran into a couple other problems, as I am trying > to use a String for partition key. > > > Passing String types for either key or partitionKey generated: > KafkaSystemProducer [WARN] Triggering a reconnect for kafka because > connection failed: java.lang.ClassCastException: java.lang.String cannot be > cast to [B > > I was able to configure kafka with: > partitioner.class=kafka.producer.DefaultPartitioner > > In which case I can now use string for partitionKey and byte[] for key, which > worked (yay!). > > However, if I change to pass only key, it still fails with the > java.lang.ClassCastException: java.lang.String cannot be cast to [B > I also tried specifying in the kafka producer.properties: > key.serializer.class=kafka.serializer.StringEncoder > > but had the same results. > > Is there something special I need to do to use String type for key when using: > public OutgoingMessageEnvelope(SystemStream systemStream, > java.lang.Object key, > java.lang.Object message) > > Thanks > Tyson > > On Apr 14, 2014, at 12:58 PM, Chris Riccomini <[email protected]> wrote: > >> Hey Tyler, >> >> Yeah, sorry this is not more clear. Physical Kafka partitions are >> per-topic. The default partition count for a newly created topic in Kafka >> is defined using the num.partitions setting, as you've discovered. The >> default setting that Kafka ships with is 1. This can be overridden on a >> per-topic basis by using the kafka-create-topic.sh tool. >> >> Cheers, >> Chris >> >> On 4/14/14 12:47 PM, "Tyson Norris" <[email protected]> wrote: >> >>> OK that was wrong as well (my SystemFactory is indeed supposed to use a >>> single partition, just not the kafka system factory), but I finally got >>> things working as expected with some kafka config changes. >>> >>> For now, I set in deploy/kafka/config/server.properties: >>> num.partitions=5 >>> >>> (although I assume there is a better per-topic value I should set instead >>> of a default like this) >>> >>> And now I see multiple task instances created as desired. >>> >>> Thanks >>> Tyson >>> >>> On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote: >>> >>>> OK sorry for the noise. >>>> I stumbled upon another clue - my SystemFactory has (based on >>>> WikipediaSystemFactory) : >>>> @Override >>>> public SystemAdmin getAdmin(String systemName, Config config) { >>>> return new SinglePartitionWithoutOffsetsSystemAdmin(); >>>> } >>>> >>>> Which I guess is a good reason my system is only using a single >>>> partition. Doh. >>>> I will work on a new SystemFactory impl to test with... >>>> >>>> Thanks >>>> Tyson >>>> >>>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote: >>>> >>>>> I am actually wondering if I’m missing a bit of configuration that >>>>> indicates the number of partitions I want to create for various kafka >>>>> topics that messages are sent to? >>>>> >>>>> I don’t see where this should be added in the config, and it appears >>>>> the partitions are not created automatically when I specify the key for >>>>> partitioning. >>>>> >>>>> >>>>> Thanks >>>>> Tyson >>>>> >>>>> >>>>> >>>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris >>>>> <[email protected]<mailto:[email protected]>> wrote: >>>>> >>>>> Hi Chris - >>>>> >>>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini >>>>> <[email protected]<mailto:[email protected]>> wrote: >>>>> >>>>> Hey Tyler, >>>>> >>>>> """I¹m trying to have multiple tasks instances run, each processing a >>>>> separate partition, and it appears that only a single task instance >>>>> runs, >>>>> processing all partitions. Or else my partitions are not created >>>>> properly.""" >>>>> >>>>> Are you trying to consume a single stream that has multiple >>>>> partitions, or >>>>> are you processing multiple streams that all have one partition? If >>>>> it's >>>>> the latter, all of these messages will get routed to a single task >>>>> instance. There is an upcoming patch to allow alternative partition >>>>> task >>>>> instance mappings (SAMZA-71), which Jakob Homan is working on >>>>> currently. >>>>> >>>>> No I’m trying for the former, although my SystemConsumer is set up >>>>> like the latter. That is, I have a system consumer that should >>>>> generate messages in a single partition, and a task that takes all >>>>> messages and splits them into multiple partitions. >>>>> >>>>> So, in my SystemConsumer I have: >>>>> SystemStreamPartition systemStreamPartition = new >>>>> SystemStreamPartition(systemName, streamId, new Partition(0)); >>>>> try { >>>>> put(systemStreamPartition, new >>>>> IncomingMessageEnvelope(systemStreamPartition, null, null, object)); >>>>> >>>>> which generates messages on the same stream + partition. >>>>> >>>>> Then in my first task I have: >>>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >>>>> partitionKey.getBytes(), outgoingMap)); >>>>> >>>>> which I am trying to get routed to separate task instances based on >>>>> partitionKey. >>>>> >>>>> >>>>> If you have a single input stream with multiple partitions, you should >>>>> end >>>>> up with one task instance per partition. This partitioning model is >>>>> explained in some detail at the 20 minute mark in this talk: >>>>> >>>>> http://www.infoq.com/presentations/samza-linkedin >>>>> >>>>> """the example in docs of collector.send(new >>>>> OutgoingMessageEnvelope(new >>>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"), >>>>> msg.get("user_id"), >>>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor >>>>> has >>>>> a key AND partitionKey - I assume the partitionKey should be >>>>> msg.get(³user_id²) in this case, but what should key be? Just a unique >>>>> value for this message?""" >>>>> >>>>> The OutgoingMessageEnvelope has several constructors. The two you're >>>>> referring to are: >>>>> >>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object >>>>> partitionKey, Object key, Object message) >>>>> >>>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key, >>>>> Object message) >>>>> >>>>> >>>>> This is, indeed, odd. In general, people only want the second >>>>> constructor >>>>> (systemStream, key, message). The constructor with the partitionKey has >>>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along >>>>> with messages in the actual log segments on the disk. This is useful >>>>> because it means you can get access to the key information that was >>>>> sent >>>>> with the message. It also means that you can use log-compaction to >>>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some >>>>> cases where you might wish to partition a topic by one key (say, member >>>>> ID), but store (or de-deuplicate by) a different key with the message. >>>>> >>>>> >>>>> So in the case where I don’t care about deduplication, is the second >>>>> constructor “key” parameter actually used as partition key? >>>>> >>>>> >>>>> >>>>> >>>>> """Do I need to specify partition manager and yarn.container.count to >>>>> get >>>>> multiple instances of my task working to service separate >>>>> partitions?""" >>>>> >>>>> This class has been replaced by the KafkaSystemFactory and >>>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the >>>>> partitions will be handled properly by Samza. The yarn.container.count >>>>> simply specifies how many containers (java processes) you get to run >>>>> your >>>>> tasks in. If you have only one TaskInstance, but specify a container >>>>> count >>>>> of 2, the second container won't have any partitions to process, and I >>>>> believe the job will fail. You need to set your container count <= the >>>>> partition count of your input topics. >>>>> >>>>> Ok, so it sounds like should be able to have multiple task instances >>>>> in a single container, if the partitioning works. >>>>> >>>>> Thanks! >>>>> Tyson >>>>> >>>>> >>>>> >>>>> Cheers, >>>>> Chris >>>>> >>>>> On 4/11/14 12:55 PM, "Tyson Norris" >>>>> <[email protected]<mailto:[email protected]>> wrote: >>>>> >>>>> Possibly related - I cannot seem to find the source for >>>>> KafkaPartitionManager - can someone point me to it? >>>>> >>>>> Thanks >>>>> Tyson >>>>> >>>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris >>>>> <[email protected]<mailto:[email protected]>> wrote: >>>>> >>>>> Hi - >>>>> I have a couple questions about partitioning - I¹m trying to have >>>>> multiple tasks instances run, each processing a separate partition, and >>>>> it appears that only a single task instance runs, processing all >>>>> partitions. Or else my partitions are not created properly. This is >>>>> based on a modified version of hello-samza, so I¹m not sure exactly >>>>> which config+code steps to take to enable partitioning of message to >>>>> multiple instances of the same task. >>>>> >>>>> To route to a partition i use: messageCollector.send(new >>>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey, >>>>> outgoingMap)); >>>>> - question here: the example in docs of collector.send(new >>>>> OutgoingMessageEnvelope(new SystemStream("kafka", >>>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems >>>>> confusing, because the OutgoingMessageEnvelope constructor has a key >>>>> AND >>>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²) >>>>> in >>>>> this case, but what should key be? Just a unique value for this >>>>> message? >>>>> >>>>> I tried the 3 parameter constructor as well and have similar problems, >>>>> where my single task instance is used regardless of partitionKey >>>>> specified in the OutgoingMessageEnvelope. >>>>> >>>>> Do I need to specify partition manager and yarn.container.count to get >>>>> multiple instances of my task working to service separate partitions? >>>>> >>>>> I¹m not sure how to tell if my messages are routed to the correct >>>>> partition in kafka, or whether the problem is a partition handling >>>>> config in samza. >>>>> >>>>> Any advice is appreciated! >>>>> >>>>> Thanks >>>>> Tyson >>>>> >>>> >>> >> >
