Github user srdo commented on the issue:
I think we should consider if we can improve the way this spout implements
the Trident API.
The Trident API is expecting that the Coordinator figures out which
partitions exist for a batch. The partitions are passed to the spout executors
("coordinatorMeta"), and it is expected that the Emitter filters that list to
get the partitions assigned to itself. Please see
Deciding which partitions exist is the responsibility of the Coordinator,
but this implementation puts that responsibility in the Emitter, which causes
us to have to hack around with e.g. the enum instance or having to drop an emit
because the partition is no longer assigned to this task. It will break if the
scheduler happens to put the Coordinator in a different worker from any of the
Emitter tasks. The Emitter code ends up being confusing, e.g. refreshPartitions
does nothing, but logs based on the coordinatorMeta, which may be different
from what the spout is actually assigned. I also think we make it easier to
maintain OpaquePartitionedTridentSpoutExecutor if we don't have to keep in mind
that the Kafka spout doesn't implement the API in the expected way.
We already have the clean separation we need to implement the API as
specified, but they're conflated in the Subscription interface. The
Coordinator should use PartitionFilter and its own KafkaConsumer instance
(which we should add) to get the list of batch partitions instead of asking the
KafkaTridentSpoutManager, so we get a nice decoupling from the Emitter. We
should put the refresh subscription timer in the Coordinator as well. The
Emitter should receive these partitions in getPartitionsForTask, use
ManualPartitioner to decide which partitions are assigned to the task, and
assign them on the consumer.
We'd need to change either KafkaSpoutConfig or Subscription a bit so we can
get at the partitioner and filter classes, but I think it should be doable.
Somewhat related: I noticed that OpaquePartitionedSpoutExecutor was changed
in https://github.com/apache/storm/pull/1995 to fix this spout. It might be
good to deprecate the getOrderedPartitions/refreshPartitions methods in 1.x so
we can remove them from master. Right now the functionality seems duplicated on
the Emitter interface, since getPartitionsForTask has the same purpose.
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket