Github user srdo commented on the issue:
    I think we should consider if we can improve the way this spout implements 
the Trident API.
    The Trident API is expecting that the Coordinator figures out which 
partitions exist for a batch. The partitions are passed to the spout executors 
("coordinatorMeta"), and it is expected that the Emitter filters that list to 
get the partitions assigned to itself. Please see
    Deciding which partitions exist is the responsibility of the Coordinator, 
but this implementation puts that responsibility in the Emitter, which causes 
us to have to hack around with e.g. the enum instance or having to drop an emit 
because the partition is no longer assigned to this task. It will break if the 
scheduler happens to put the Coordinator in a different worker from any of the 
Emitter tasks. The Emitter code ends up being confusing, e.g. refreshPartitions 
does nothing, but logs based on the coordinatorMeta, which may be different 
from what the spout is actually assigned. I also think we make it easier to 
maintain OpaquePartitionedTridentSpoutExecutor if we don't have to keep in mind 
that the Kafka spout doesn't implement the API in the expected way.
    We already have the clean separation we need to implement the API as 
specified, but they're conflated in the Subscription interface.  The 
Coordinator should use PartitionFilter and its own KafkaConsumer instance 
(which we should add) to get the list of batch partitions instead of asking the 
KafkaTridentSpoutManager, so we get a nice decoupling from the Emitter. We 
should put the refresh subscription timer in the Coordinator as well. The 
Emitter should receive these partitions in getPartitionsForTask, use 
ManualPartitioner to decide which partitions are assigned to the task, and 
assign them on the consumer. 
    We'd need to change either KafkaSpoutConfig or Subscription a bit so we can 
get at the partitioner and filter classes, but I think it should be doable.
    Somewhat related: I noticed that OpaquePartitionedSpoutExecutor was changed 
in to fix this spout. It might be 
good to deprecate the getOrderedPartitions/refreshPartitions methods in 1.x so 
we can remove them from master. Right now the functionality seems duplicated on 
the Emitter interface, since getPartitionsForTask has the same purpose.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to