Hi Mike,

FYI, support for "max.poll.records" was added in
https://github.com/apache/kafka/pull/931 (KAFKA-3007) which was not present
in the streams tech preview release. It will however be in 0.10

Cheers,
Geoff

On Thu, Apr 7, 2016 at 4:58 AM, Michael D. Coon <mdco...@yahoo.com.invalid>
wrote:

> One more thing I'm noticing in the logs.
> I see periodic node disconnection messages due to "timeout". I set my
> metadata.fetch.timeout.ms to 60000, request.timeout.ms to 30000 and
> timeout.ms to 30000 and those should be more than enough time waiting for
> metadata responses. I also set my offset commit period to 60000 and these
> disconnected messages seem to overlap with offset commit
> threshold...meaning it seems to be happening when the offset commit
> attempts are being made. The "api_key" in the failed request is "1"...I'd
> have to dig into the code to know what the corresponds to.
>
>
>
>     On Thursday, April 7, 2016 7:35 AM, Michael D. Coon
> <mdco...@yahoo.com.INVALID> wrote:
>
>
>  Guozhang,
>    Thanks for the advice; however, "max.poll.records" doesn't seem to be
> supported since it's not affecting how many records are coming back from
> the consumer.poll requests. However, I agree that the likely culprit in
> rebalancing is the delay in processing new records. I'm going to try and
> play with the max buffer size per partition setting to see if I can force
> the consumer to pause, and thus not inject too many records too quickly. It
> would be awesome if the max.poll.records setting was respected by the
> consumer/broker and it returned a max number of messages. I feel like this
> used to be supported in the older Kafka APIs. This setting would allow more
> tuning of how much data each of my stream job instances receives.
>
> Mike
>
>
>     On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
>
>  Hi Michael,
> Your issue seems like a more general one with the new Kafka Consumer
> regarding unexpected rebalances: as for Kafka Streams, it's committing
> behavior is synchronous, i.e. triggering "consumer.commitSync" of the
> underlying new Kafka Consumer, which will fail if there is an ongoing
> rebalance, since the partitions being committed on may not be owned by the
> consumer anymore.
> As for rebalance, there are several cases that can cause it:
> 1) topic changes, like creation of new topics, partition addition, topic
> deletes, etc.
> If you are not changing topics at the moment then you can exclude this
> case.
> 2) consumer failures detected by the heart beat protocol, and hence
> migrating partitions out of the failed consumer.
> Note that the heartbeat is wrapped in the poll() protocol, so if your
> consumer thread (and similarly Kafka Streams) takes long time to process
> polled records while your configured session.timeout.ms value is not
> large enough.
> So you can consider 1) increase session.timeout.ms value, 2) set
> max.poll.records to a reasonably small values to avoid your consumers being
> falsely considered as failed.
> More info about the consumer configs:
> http://kafka.apache.org/documentation.html#newconsumerconfigs
>
>
> Guozhang
>
> On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid>
> wrote:
>
> All,
>    I'm getting CommitFailedExceptions on a small prototype I built using
> kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several
> processors chained together with a sink in between a few of them. When I
> try committing through the ProcessorContext, I see exceptions being thrown
> about commit failures due to group rebalance (not to mention delays in
> processing during commit attempts). I'm using a single host, with 2 stream
> threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be
> occurring after data starts flowing and I'm committing offsets. This was
> something I saw with the new Kafka client APIs as well, and I had to work
> around by creating static partition assignments to my consumers in my data
> flow...otherwise, any dynamic allocation of new consumers to the group
> caused this exception to be thrown and I could never commit my offsets. Are
> you all not seeing this in your tests? Am I not supposed to commit through
> the ProcessorContext? I'm committing once my interim processor writes its
> output to the sink point in the flow; is that not the correct/expected
> behavior/use?
> Mike
>
>
>
>
>
> --
> -- Guozhang
>
>
>
>
>
>



-- 
*Geoff Anderson | Software Engineer** | Confluent | +1 612.968.7340*
*Download Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*

Reply via email to