Hey Tyler, """I¹m trying to have multiple tasks instances run, each processing a separate partition, and it appears that only a single task instance runs, processing all partitions. Or else my partitions are not created properly."""
Are you trying to consume a single stream that has multiple partitions, or are you processing multiple streams that all have one partition? If it's the latter, all of these messages will get routed to a single task instance. There is an upcoming patch to allow alternative partition task instance mappings (SAMZA-71), which Jakob Homan is working on currently. If you have a single input stream with multiple partitions, you should end up with one task instance per partition. This partitioning model is explained in some detail at the 20 minute mark in this talk: http://www.infoq.com/presentations/samza-linkedin """the example in docs of collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems confusing, because the OutgoingMessageEnvelope constructor has a key AND partitionKey - I assume the partitionKey should be msg.get(³user_id²) in this case, but what should key be? Just a unique value for this message?""" The OutgoingMessageEnvelope has several constructors. The two you're referring to are: public OutgoingMessageEnvelope(SystemStream systemStream, Object partitionKey, Object key, Object message) public OutgoingMessageEnvelope(SystemStream systemStream, Object key, Object message) This is, indeed, odd. In general, people only want the second constructor (systemStream, key, message). The constructor with the partitionKey has its origins in the Kafka API. With Kafka 0.8, keys are now stored along with messages in the actual log segments on the disk. This is useful because it means you can get access to the key information that was sent with the message. It also means that you can use log-compaction to de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some cases where you might wish to partition a topic by one key (say, member ID), but store (or de-deuplicate by) a different key with the message. """Do I need to specify partition manager and yarn.container.count to get multiple instances of my task working to service separate partitions?""" This class has been replaced by the KafkaSystemFactory and KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the partitions will be handled properly by Samza. The yarn.container.count simply specifies how many containers (java processes) you get to run your tasks in. If you have only one TaskInstance, but specify a container count of 2, the second container won't have any partitions to process, and I believe the job will fail. You need to set your container count <= the partition count of your input topics. Cheers, Chris On 4/11/14 12:55 PM, "Tyson Norris" <[email protected]> wrote: >Possibly related - I cannot seem to find the source for >KafkaPartitionManager - can someone point me to it? > >Thanks >Tyson > >On Apr 11, 2014, at 12:15 PM, Tyson Norris <[email protected]> wrote: > >> Hi - >> I have a couple questions about partitioning - I¹m trying to have >>multiple tasks instances run, each processing a separate partition, and >>it appears that only a single task instance runs, processing all >>partitions. Or else my partitions are not created properly. This is >>based on a modified version of hello-samza, so I¹m not sure exactly >>which config+code steps to take to enable partitioning of message to >>multiple instances of the same task. >> >> To route to a partition i use: messageCollector.send(new >>OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey, >>outgoingMap)); >> - question here: the example in docs of collector.send(new >>OutgoingMessageEnvelope(new SystemStream("kafka", >>"SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems >>confusing, because the OutgoingMessageEnvelope constructor has a key AND >>partitionKey - I assume the partitionKey should be msg.get(³user_id²) in >>this case, but what should key be? Just a unique value for this message? >> >> I tried the 3 parameter constructor as well and have similar problems, >>where my single task instance is used regardless of partitionKey >>specified in the OutgoingMessageEnvelope. >> >> Do I need to specify partition manager and yarn.container.count to get >>multiple instances of my task working to service separate partitions? >> >> I¹m not sure how to tell if my messages are routed to the correct >>partition in kafka, or whether the problem is a partition handling >>config in samza. >> >> Any advice is appreciated! >> >> Thanks >> Tyson
