Hey Tyler,

Yeah, sorry this is not more clear. Physical Kafka partitions are
per-topic. The default partition count for a newly created topic in Kafka
is defined using the num.partitions setting, as you've discovered. The
default setting that Kafka ships with is 1. This can be overridden on a
per-topic basis by using the kafka-create-topic.sh tool.

Cheers,
Chris

On 4/14/14 12:47 PM, "Tyson Norris" <[email protected]> wrote:

>OK that was wrong as well (my SystemFactory is indeed supposed to use a
>single partition, just not the kafka system factory), but I finally got
>things working as expected with some kafka config changes.
>
>For now, I set in deploy/kafka/config/server.properties:
>num.partitions=5
>
>(although I assume there is a better per-topic value I should set instead
>of a default like this)
>
>And now I see multiple task instances created as desired.
>
>Thanks
>Tyson
>
>On Apr 14, 2014, at 12:36 PM, Tyson Norris <[email protected]> wrote:
>
>> OK sorry for the noise.
>> I stumbled upon another clue - my SystemFactory has (based on
>>WikipediaSystemFactory) :
>>    @Override
>>    public SystemAdmin getAdmin(String systemName, Config config) {
>>        return new SinglePartitionWithoutOffsetsSystemAdmin();
>>    }
>> 
>> Which I guess is a good reason my system is only using a single
>>partition. Doh.
>> I will work on a new SystemFactory impl to test with...
>> 
>> Thanks
>> Tyson
>> 
>> On Apr 14, 2014, at 12:20 PM, Tyson Norris <[email protected]> wrote:
>> 
>>> I am actually wondering if I’m missing a bit of configuration that
>>>indicates the number of partitions I want to create for various kafka
>>>topics that messages are sent to?
>>> 
>>> I don’t see where this should be added in the config, and it appears
>>>the partitions are not created automatically when I specify the key for
>>>partitioning.
>>> 
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> 
>>> 
>>> On Apr 14, 2014, at 10:35 AM, Tyson Norris
>>><[email protected]<mailto:[email protected]>> wrote:
>>> 
>>> Hi Chris -
>>> 
>>> On Apr 14, 2014, at 9:13 AM, Chris Riccomini
>>><[email protected]<mailto:[email protected]>> wrote:
>>> 
>>> Hey Tyler,
>>> 
>>> """I¹m trying to have multiple tasks instances run, each processing a
>>> separate partition, and it appears that only a single task instance
>>>runs,
>>> processing all partitions. Or else my partitions are not created
>>> properly."""
>>> 
>>> Are you trying to consume a single stream that has multiple
>>>partitions, or
>>> are you processing multiple streams that all have one partition? If
>>>it's
>>> the latter, all of these messages will get routed to a single task
>>> instance. There is an upcoming patch to allow alternative partition
>>>task
>>> instance mappings (SAMZA-71), which Jakob Homan is working on
>>>currently.
>>> 
>>> No I’m trying for the former, although my SystemConsumer is set up
>>>like the latter.  That is, I have a system consumer that should
>>>generate messages in a single partition, and a task that takes all
>>>messages and splits them into multiple partitions.
>>> 
>>> So, in my SystemConsumer I have:
>>>      SystemStreamPartition systemStreamPartition = new
>>>SystemStreamPartition(systemName, streamId, new Partition(0));
>>>      try {
>>>          put(systemStreamPartition, new
>>>IncomingMessageEnvelope(systemStreamPartition, null, null, object));
>>> 
>>> which generates messages on the same stream + partition.
>>> 
>>> Then in my first task I have:
>>> messageCollector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>partitionKey.getBytes(), outgoingMap));
>>> 
>>> which I am trying to get routed to separate task instances based on
>>>partitionKey.
>>> 
>>> 
>>> If you have a single input stream with multiple partitions, you should
>>>end
>>> up with one task instance per partition. This partitioning model is
>>> explained in some detail at the 20 minute mark in this talk:
>>> 
>>> http://www.infoq.com/presentations/samza-linkedin
>>> 
>>> """the example in docs of collector.send(new
>>>OutgoingMessageEnvelope(new
>>> SystemStream("kafka", "SomeTopicPartitionedByUserId"),
>>>msg.get("user_id"),
>>> msg)) seems confusing, because the OutgoingMessageEnvelope constructor
>>>has
>>> a key AND partitionKey - I assume the partitionKey should be
>>> msg.get(³user_id²) in this case, but what should key be? Just a unique
>>> value for this message?"""
>>> 
>>> The OutgoingMessageEnvelope has several constructors. The two you're
>>> referring to are:
>>> 
>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object
>>> partitionKey, Object key, Object message)
>>> 
>>> public OutgoingMessageEnvelope(SystemStream systemStream, Object key,
>>> Object message)
>>> 
>>> 
>>> This is, indeed, odd. In general, people only want the second
>>>constructor
>>> (systemStream, key, message). The constructor with the partitionKey has
>>> its origins in the Kafka API. With Kafka 0.8, keys are now stored along
>>> with messages in the actual log segments on the disk. This is useful
>>> because it means you can get access to the key information that was
>>>sent
>>> with the message. It also means that you can use log-compaction to
>>> de-duplicate keys in a Kafka topic (an 0.8.1 feature). There are some
>>> cases where you might wish to partition a topic by one key (say, member
>>> ID), but store (or de-deuplicate by) a different key with the message.
>>> 
>>> 
>>> So in the case where I don’t care about deduplication, is the second
>>>constructor “key” parameter actually used as partition key?
>>> 
>>> 
>>> 
>>> 
>>> """Do I need to specify partition manager and yarn.container.count to
>>>get
>>> multiple instances of my task working to service separate
>>>partitions?"""
>>> 
>>> This class has been replaced by the KafkaSystemFactory and
>>> KafkaSystemAdmin. As long as you've specified a KafkaSystemFactory, the
>>> partitions will be handled properly by Samza. The yarn.container.count
>>> simply specifies how many containers (java processes) you get to run
>>>your
>>> tasks in. If you have only one TaskInstance, but specify a container
>>>count
>>> of 2, the second container won't have any partitions to process, and I
>>> believe the job will fail. You need to set your container count <=  the
>>> partition count of your input topics.
>>> 
>>> Ok, so it sounds like should be able to have multiple task instances
>>>in a single container, if the partitioning works.
>>> 
>>> Thanks!
>>> Tyson
>>> 
>>> 
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 4/11/14 12:55 PM, "Tyson Norris"
>>><[email protected]<mailto:[email protected]>> wrote:
>>> 
>>> Possibly related - I cannot seem to find the source for
>>> KafkaPartitionManager - can someone point me to it?
>>> 
>>> Thanks
>>> Tyson
>>> 
>>> On Apr 11, 2014, at 12:15 PM, Tyson Norris
>>><[email protected]<mailto:[email protected]>> wrote:
>>> 
>>> Hi -
>>> I have a couple questions about partitioning - I¹m trying to have
>>> multiple tasks instances run, each processing a separate partition, and
>>> it appears that only a single task instance runs, processing all
>>> partitions. Or else my partitions are not created properly. This is
>>> based on a modified version of hello-samza, so I¹m not sure exactly
>>> which config+code steps to take to enable partitioning of message to
>>> multiple instances of the same task.
>>> 
>>> To route to a partition i use: messageCollector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, msgKey, partitionKey,
>>> outgoingMap));
>>> - question here: the example in docs of collector.send(new
>>> OutgoingMessageEnvelope(new SystemStream("kafka",
>>> "SomeTopicPartitionedByUserId"), msg.get("user_id"), msg)) seems
>>> confusing, because the OutgoingMessageEnvelope constructor has a key
>>>AND
>>> partitionKey - I assume the partitionKey should be msg.get(³user_id²)
>>>in
>>> this case, but what should key be? Just a unique value for this
>>>message?
>>> 
>>> I tried the 3 parameter constructor as well and have similar problems,
>>> where my single task instance is used regardless of partitionKey
>>> specified in the OutgoingMessageEnvelope.
>>> 
>>> Do I need to specify partition manager and yarn.container.count to get
>>> multiple instances of my task working to service separate partitions?
>>> 
>>> I¹m not sure how to tell if my messages are routed to the correct
>>> partition in kafka, or whether the problem is a partition handling
>>> config in samza.
>>> 
>>> Any advice is appreciated!
>>> 
>>> Thanks
>>> Tyson
>>> 
>> 
>

Reply via email to