[ 
https://issues.apache.org/jira/browse/SAMZA-144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14018325#comment-14018325
 ] 

Yan Fang commented on SAMZA-144:
--------------------------------

Yes, if sharing connections among streams, we should have a common 
configuration for it instead of stream-specific. 

On the consumer side, I can not find any other configs per-topic from the 
reference. In the KafkaSystemFactory, there is a TODO:

{code}
//TODO could add stream-level overrides for timeout and buffer size
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
{code}

Based our discussion, we may not think this is a "TODO" any more. Also, 
auto.offset.reset has already been implemented. So the only stream-level 
override I can put is the 

* fetchSize

In the producer side, actually I even do not think we should use override 
because the system has no knowledge about the produced stream before hand. They 
are defined in the application layer. (at the same time, there is no per-topic 
config in the configuration sheet).

Therefore, if you also agree, I will add fetchSize stream-level override. Thank 
you.







> Add stream-level overrides for KafkaSystemFactory config
> --------------------------------------------------------
>
>                 Key: SAMZA-144
>                 URL: https://issues.apache.org/jira/browse/SAMZA-144
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Yan Fang
>
> Currently, we allow Kafka systems to be defined with:
> {noformat}
> systems.my-kafka-system.consumer.*
> {noformat}
> And:
> {noformat}
> systems.my-kafka-system.producer.*
> {noformat}
> We don't support stream-level configurations, the way we do with StreamConfig 
> configuration:
> {code}
>   val STREAM_PREFIX = "systems.%s.streams.%s."
>   val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
>   val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
>   val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
> {code}
> We should add stream-level configuration override support for Kafka consumer 
> and producers with:
> {noformat}
> systems.my-kafka-system.streams.my-kafka-topic.consumer.*
> systems.my-kafka-system.streams.my-kafka-topic.producer.*
> {noformat}
> KafkaConfig and KafkaSystemFactory should be updated to handle these.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to