Hey Tyler, Yeah, sorry this is not more clear. Physical Kafka partitions are per-topic. The default partition count for a newly created topic in Kafka is defined using the num.partitions setting, as you've discovered. The default setting that Kafka ships with is 1. This can be overridden on a per-topic basis by using the kafka-create-topic.sh tool.
Cheers, Chris On 4/14/14 12:47 PM, "Tyson Norris" <[email protected]> wrote: >OK that was wrong as well (my SystemFactory is indeed supposed to use a >single partition, just not the kafka system factory), but I finally got >things working as expected with some kafka config changes. > >For now, I set in deploy/kafka/config/server.properties: >num.partitions=5 > >(although I assume there is a better per-topic value I should set instead >of a default like this) > >And now I see multiple task instances created as desired. > >Thanks >Tyson > >On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote: > >> OK sorry for the noise. >> I stumbled upon another clue - my SystemFactory has (based on >>WikipediaSystemFactory) : >> @Override >> public SystemAdmin getAdmin(String systemName, Config config) { >> return new SinglePartitionWithoutOffsetsSystemAdmin(); >> } >> >> Which I guess is a good reason my system is only using a single >>partition. Doh. >> I will work on a new SystemFactory impl to test with... >> >> Thanks >> Tyson >> >> On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote: >> >>> I am actually wondering if I’m missing a bit of configuration that >>>indicates the number of partitions I want to create for various kafka >>>topics that messages are sent to? >>> >>> I don’t see where this should be added in the config, and it appears >>>the partitions are not created automatically when I specify the key for >>>partitioning. >>> >>> >>> Thanks >>> Tyson >>> >>> >>> >>> On Apr 14, 2014, at 10:35 AM, Tyson Norris >>><[email protected]<mailto:[email protected]>> wrote: >>> >>> Hi Chris - >>> >>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini >>><[email protected]<mailto:[email protected]>> wrote: >>> >>> Hey Tyler, >>> >>> """I¹m trying to have multiple tasks instances run, each processing a >>> separate partition, and it appears that only a single task instance >>>runs, >>> processing all partitions. Or else my partitions are not created >>> properly.""" >>> >>> Are you trying to consume a single stream that has multiple >>>partitions, or >>> are you processing multiple streams that all have one partition? If >>>it's >>> the latter, all of these messages will get routed to a single task >>> instance. There is an upcoming patch to allow alternative partition >>>task >>> instance mappings (SAMZA-71), which Jakob Homan is working on >>>currently. >>> >>> No I’m trying for the former, although my SystemConsumer is set up >>>like the latter. That is, I have a system consumer that should >>>generate messages in a single partition, and a task that takes all >>>messages and splits them into multiple partitions. >>> >>> So, in my SystemConsumer I have: >>> SystemStreamPartition systemStreamPartition = new >>>SystemStreamPartition(systemName, streamId, new Partition(0)); >>> try { >>> put(systemStreamPartition, new >>>IncomingMessageEnvelope(systemStreamPartition, null, null, object)); >>> >>> which generates messages on the same stream + partition. >>> >>> Then in my first task I have: >>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >>>partitionKey.getBytes(), outgoingMap)); >>> >>> which I am trying to get routed to separate task instances based on >>>partitionKey. >>> >>> >>> If you have a single input stream with multiple partitions, you should >>>end >>> up with one task instance per partition. This partitioning model is >>> explained in some detail at the 20 minute mark in this talk: >>> >>> http://www.infoq.com/presentations/samza-linkedin >>> >>> """the example in docs of collector.send(new >>>OutgoingMessageEnvelope(new >>> SystemStream("kafka", "SomeTopicPartitionedByUserId"), >>>msg.get("user_id"), >>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor >>>has >>> a key AND partitionKey - I assume the partitionKey should be >>> msg.get(³user_id²) in this case, but what should key be? Just a unique >>> value for this message?""" >>> >>> The OutgoingMessageEnvelope has several constructors. The two you're >>> referring to are: >>> >>> public OutgoingMessageEnvelope(SystemStream systemStream, Object >>> partitionKey, Object key, Object message) >>> >>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key, >>> Object message) >>> >>> >>> This is, indeed, odd. In general, people only want the second >>>constructor >>> (systemStream, key, message). The constructor with the partitionKey has >>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along >>> with messages in the actual log segments on the disk. This is useful >>> because it means you can get access to the key information that was >>>sent >>> with the message. It also means that you can use log-compaction to >>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some >>> cases where you might wish to partition a topic by one key (say, member >>> ID), but store (or de-deuplicate by) a different key with the message. >>> >>> >>> So in the case where I don’t care about deduplication, is the second >>>constructor “key” parameter actually used as partition key? >>> >>> >>> >>> >>> """Do I need to specify partition manager and yarn.container.count to >>>get >>> multiple instances of my task working to service separate >>>partitions?""" >>> >>> This class has been replaced by the KafkaSystemFactory and >>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the >>> partitions will be handled properly by Samza. The yarn.container.count >>> simply specifies how many containers (java processes) you get to run >>>your >>> tasks in. If you have only one TaskInstance, but specify a container >>>count >>> of 2, the second container won't have any partitions to process, and I >>> believe the job will fail. You need to set your container count <= the >>> partition count of your input topics. >>> >>> Ok, so it sounds like should be able to have multiple task instances >>>in a single container, if the partitioning works. >>> >>> Thanks! >>> Tyson >>> >>> >>> >>> Cheers, >>> Chris >>> >>> On 4/11/14 12:55 PM, "Tyson Norris" >>><[email protected]<mailto:[email protected]>> wrote: >>> >>> Possibly related - I cannot seem to find the source for >>> KafkaPartitionManager - can someone point me to it? >>> >>> Thanks >>> Tyson >>> >>> On Apr 11, 2014, at 12:15 PM, Tyson Norris >>><[email protected]<mailto:[email protected]>> wrote: >>> >>> Hi - >>> I have a couple questions about partitioning - I¹m trying to have >>> multiple tasks instances run, each processing a separate partition, and >>> it appears that only a single task instance runs, processing all >>> partitions. Or else my partitions are not created properly. This is >>> based on a modified version of hello-samza, so I¹m not sure exactly >>> which config+code steps to take to enable partitioning of message to >>> multiple instances of the same task. >>> >>> To route to a partition i use: messageCollector.send(new >>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey, >>> outgoingMap)); >>> - question here: the example in docs of collector.send(new >>> OutgoingMessageEnvelope(new SystemStream("kafka", >>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems >>> confusing, because the OutgoingMessageEnvelope constructor has a key >>>AND >>> partitionKey - I assume the partitionKey should be msg.get(³user_id²) >>>in >>> this case, but what should key be? Just a unique value for this >>>message? >>> >>> I tried the 3 parameter constructor as well and have similar problems, >>> where my single task instance is used regardless of partitionKey >>> specified in the OutgoingMessageEnvelope. >>> >>> Do I need to specify partition manager and yarn.container.count to get >>> multiple instances of my task working to service separate partitions? >>> >>> I¹m not sure how to tell if my messages are routed to the correct >>> partition in kafka, or whether the problem is a partition handling >>> config in samza. >>> >>> Any advice is appreciated! >>> >>> Thanks >>> Tyson >>> >> >
