[ 
https://issues.apache.org/jira/browse/SAMZA-82?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13818360#comment-13818360
 ] 

Chris Riccomini commented on SAMZA-82:
--------------------------------------

Thinking on this a bit more. One way to fix this would be to remove the 
SAMZA_PARTITION_IDS environment variable in ShellCommandConfig, and replace it 
with something like SAMZA_STREAM_PARTITIONS, or something. The 
SAMZA_PARTITION_IDS just defines which partitions a SamzaContainer is 
responsible for, but this is actually not really what we want: we want the 
stream/partition parts. If we set something like this:

{noformat}
SAMZA_STREAM_PARTITIONS=foo.bar:0,foo.bar:2,foo.baz:0
{noformat}

Then the SamzaContainer would know exactly which partitions AND streams it was 
responsible for. This would allow us to prevent TaskInstance from accidentally 
registering non-existent partitions for a topic (e.g foo.baz:2).

The partition assignment is done in two spots: SamzaAppMasterTaskManager.scala, 
and LocalJobFactory.scala. In both cases, we'd have to switch to building a set 
of all individual SystemStreamPartitions, and then do a projection of that set 
when building the SAMZA_STREAM_PARTITIONS value.

> Not use maximum number of partitions when initializing streams
> --------------------------------------------------------------
>
>                 Key: SAMZA-82
>                 URL: https://issues.apache.org/jira/browse/SAMZA-82
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.7.0
>            Reporter: Jakob Homan
>            Assignee: Jakob Homan
>             Fix For: 0.7.0
>
>
> Util.scala:
> {code}  /**
>    * Uses config to create SystemAdmin classes for all input stream systems to
>    * get each input stream's partition count, then returns the maximum count.
>    * An input stream with two partitions, and a second input stream with four
>    * partitions would result in this method returning 4.
>    */
>   def getMaxInputStreamPartitions(config: Config) = {
> {code}
> This approach works if all the streams have the same number of partitions, 
> but is inefficient for other cases and, where the underlying system gets 
> cranky about being asked about non-existing partitions, fails.  We should 
> eagerly figure out the correct number of partitions for each topic and pass 
> that information from there.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to