Hey there,

over the weekend I was debugging the streams configuration not passed within 
threads. I noticed that one of the code path from KafkaConsumer (L743) was to 
initialize the StreamPartitionAssignor:

this.assignors = config.getConfiguredInstances(
        ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
        PartitionAssignor.class);

However, it was using the ConsumerConfig instance (that config is passed in), 
so if I want to make some configuration change in the assignor, I need to put 
consumer prefix. To make the debugging even harder, there was an logAll() 
function in AbstractConfig which will print "StreamsConfig values" at the 
beginning, since it is indeed a stream config:

@Override
public void configure(final Map<String, ?> configs) {
    final StreamsConfig streamsConfig = new StreamsConfig(configs);

(L190 in StreamPartitionAssignor)


This would further confuse developer as they see two different sets of 
StreamsConfig: one from top level, one from this derived level per thread.


My point is that we could either: 1. let developer be aware that they need to 
add consumer prefix to pass in configs to StreamPartitionAssignor 2. we found a 
way to pass in original StreamsConfig.

I know this is a little bit lengthy description, let me know if you feel 
unclear about my proposal, or this is not a concern since most people already 
know the trick here, thank you!

Reply via email to