OK that was wrong as well (my SystemFactory is indeed supposed to use a single partition, just not the kafka system factory), but I finally got things working as expected with some kafka config changes.
For now, I set in deploy/kafka/config/server.properties: num.partitions=5 (although I assume there is a better per-topic value I should set instead of a default like this) And now I see multiple task instances created as desired. Thanks Tyson On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote: > OK sorry for the noise. > I stumbled upon another clue - my SystemFactory has (based on > WikipediaSystemFactory) : > @Override > public SystemAdmin getAdmin(String systemName, Config config) { > return new SinglePartitionWithoutOffsetsSystemAdmin(); > } > > Which I guess is a good reason my system is only using a single partition. > Doh. > I will work on a new SystemFactory impl to test with... > > Thanks > Tyson > > On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote: > >> I am actually wondering if I’m missing a bit of configuration that indicates >> the number of partitions I want to create for various kafka topics that >> messages are sent to? >> >> I don’t see where this should be added in the config, and it appears the >> partitions are not created automatically when I specify the key for >> partitioning. >> >> >> Thanks >> Tyson >> >> >> >> On Apr 14, 2014, at 10:35 AM, Tyson Norris >> <[email protected]<mailto:[email protected]>> wrote: >> >> Hi Chris - >> >> On Apr 14, 2014, at 9:13 AM, Chris Riccomini >> <[email protected]<mailto:[email protected]>> wrote: >> >> Hey Tyler, >> >> """I¹m trying to have multiple tasks instances run, each processing a >> separate partition, and it appears that only a single task instance runs, >> processing all partitions. Or else my partitions are not created >> properly.""" >> >> Are you trying to consume a single stream that has multiple partitions, or >> are you processing multiple streams that all have one partition? If it's >> the latter, all of these messages will get routed to a single task >> instance. There is an upcoming patch to allow alternative partition task >> instance mappings (SAMZA-71), which Jakob Homan is working on currently. >> >> No I’m trying for the former, although my SystemConsumer is set up like the >> latter. That is, I have a system consumer that should generate messages in >> a single partition, and a task that takes all messages and splits them into >> multiple partitions. >> >> So, in my SystemConsumer I have: >> SystemStreamPartition systemStreamPartition = new >> SystemStreamPartition(systemName, streamId, new Partition(0)); >> try { >> put(systemStreamPartition, new >> IncomingMessageEnvelope(systemStreamPartition, null, null, object)); >> >> which generates messages on the same stream + partition. >> >> Then in my first task I have: >> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> partitionKey.getBytes(), outgoingMap)); >> >> which I am trying to get routed to separate task instances based on >> partitionKey. >> >> >> If you have a single input stream with multiple partitions, you should end >> up with one task instance per partition. This partitioning model is >> explained in some detail at the 20 minute mark in this talk: >> >> http://www.infoq.com/presentations/samza-linkedin >> >> """the example in docs of collector.send(new OutgoingMessageEnvelope(new >> SystemStream("kafka", "SomeTopicPartitionedByUserId"), msg.get("user_id"), >> msg)) seems confusing, because the OutgoingMessageEnvelope constructor has >> a key AND partitionKey - I assume the partitionKey should be >> msg.get(³user_id²) in this case, but what should key be? Just a unique >> value for this message?""" >> >> The OutgoingMessageEnvelope has several constructors. The two you're >> referring to are: >> >> public OutgoingMessageEnvelope(SystemStream systemStream, Object >> partitionKey, Object key, Object message) >> >> public OutgoingMessageEnvelope(SystemStream systemStream, Object key, >> Object message) >> >> >> This is, indeed, odd. In general, people only want the second constructor >> (systemStream, key, message). The constructor with the partitionKey has >> its origins in the Kafka API. With Kafka 0.8, keys are now stored along >> with messages in the actual log segments on the disk. This is useful >> because it means you can get access to the key information that was sent >> with the message. It also means that you can use log-compaction to >> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some >> cases where you might wish to partition a topic by one key (say, member >> ID), but store (or de-deuplicate by) a different key with the message. >> >> >> So in the case where I don’t care about deduplication, is the second >> constructor “key” parameter actually used as partition key? >> >> >> >> >> """Do I need to specify partition manager and yarn.container.count to get >> multiple instances of my task working to service separate partitions?""" >> >> This class has been replaced by the KafkaSystemFactory and >> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the >> partitions will be handled properly by Samza. The yarn.container.count >> simply specifies how many containers (java processes) you get to run your >> tasks in. If you have only one TaskInstance, but specify a container count >> of 2, the second container won't have any partitions to process, and I >> believe the job will fail. You need to set your container count <= the >> partition count of your input topics. >> >> Ok, so it sounds like should be able to have multiple task instances in a >> single container, if the partitioning works. >> >> Thanks! >> Tyson >> >> >> >> Cheers, >> Chris >> >> On 4/11/14 12:55 PM, "Tyson Norris" >> <[email protected]<mailto:[email protected]>> wrote: >> >> Possibly related - I cannot seem to find the source for >> KafkaPartitionManager - can someone point me to it? >> >> Thanks >> Tyson >> >> On Apr 11, 2014, at 12:15 PM, Tyson Norris >> <[email protected]<mailto:[email protected]>> wrote: >> >> Hi - >> I have a couple questions about partitioning - I¹m trying to have >> multiple tasks instances run, each processing a separate partition, and >> it appears that only a single task instance runs, processing all >> partitions. Or else my partitions are not created properly. This is >> based on a modified version of hello-samza, so I¹m not sure exactly >> which config+code steps to take to enable partitioning of message to >> multiple instances of the same task. >> >> To route to a partition i use: messageCollector.send(new >> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey, >> outgoingMap)); >> - question here: the example in docs of collector.send(new >> OutgoingMessageEnvelope(new SystemStream("kafka", >> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems >> confusing, because the OutgoingMessageEnvelope constructor has a key AND >> partitionKey - I assume the partitionKey should be msg.get(³user_id²) in >> this case, but what should key be? Just a unique value for this message? >> >> I tried the 3 parameter constructor as well and have similar problems, >> where my single task instance is used regardless of partitionKey >> specified in the OutgoingMessageEnvelope. >> >> Do I need to specify partition manager and yarn.container.count to get >> multiple instances of my task working to service separate partitions? >> >> I¹m not sure how to tell if my messages are routed to the correct >> partition in kafka, or whether the problem is a partition handling >> config in samza. >> >> Any advice is appreciated! >> >> Thanks >> Tyson >> >
