Hi Mike, FYI, support for "max.poll.records" was added in https://github.com/apache/kafka/pull/931 (KAFKA-3007) which was not present in the streams tech preview release. It will however be in 0.10
Cheers, Geoff On Thu, Apr 7, 2016 at 4:58 AM, Michael D. Coon <mdco...@yahoo.com.invalid> wrote: > One more thing I'm noticing in the logs. > I see periodic node disconnection messages due to "timeout". I set my > metadata.fetch.timeout.ms to 60000, request.timeout.ms to 30000 and > timeout.ms to 30000 and those should be more than enough time waiting for > metadata responses. I also set my offset commit period to 60000 and these > disconnected messages seem to overlap with offset commit > threshold...meaning it seems to be happening when the offset commit > attempts are being made. The "api_key" in the failed request is "1"...I'd > have to dig into the code to know what the corresponds to. > > > > On Thursday, April 7, 2016 7:35 AM, Michael D. Coon > <mdco...@yahoo.com.INVALID> wrote: > > > Guozhang, > Thanks for the advice; however, "max.poll.records" doesn't seem to be > supported since it's not affecting how many records are coming back from > the consumer.poll requests. However, I agree that the likely culprit in > rebalancing is the delay in processing new records. I'm going to try and > play with the max buffer size per partition setting to see if I can force > the consumer to pause, and thus not inject too many records too quickly. It > would be awesome if the max.poll.records setting was respected by the > consumer/broker and it returned a max number of messages. I feel like this > used to be supported in the older Kafka APIs. This setting would allow more > tuning of how much data each of my stream job instances receives. > > Mike > > > On Wednesday, April 6, 2016 5:55 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Hi Michael, > Your issue seems like a more general one with the new Kafka Consumer > regarding unexpected rebalances: as for Kafka Streams, it's committing > behavior is synchronous, i.e. triggering "consumer.commitSync" of the > underlying new Kafka Consumer, which will fail if there is an ongoing > rebalance, since the partitions being committed on may not be owned by the > consumer anymore. > As for rebalance, there are several cases that can cause it: > 1) topic changes, like creation of new topics, partition addition, topic > deletes, etc. > If you are not changing topics at the moment then you can exclude this > case. > 2) consumer failures detected by the heart beat protocol, and hence > migrating partitions out of the failed consumer. > Note that the heartbeat is wrapped in the poll() protocol, so if your > consumer thread (and similarly Kafka Streams) takes long time to process > polled records while your configured session.timeout.ms value is not > large enough. > So you can consider 1) increase session.timeout.ms value, 2) set > max.poll.records to a reasonably small values to avoid your consumers being > falsely considered as failed. > More info about the consumer configs: > http://kafka.apache.org/documentation.html#newconsumerconfigs > > > Guozhang > > On Wed, Apr 6, 2016 at 6:26 AM, Michael D. Coon <mdco...@yahoo.com.invalid> > wrote: > > All, > I'm getting CommitFailedExceptions on a small prototype I built using > kafkaStreams. I'm not using the DSL, but the TopologyBuilder with several > processors chained together with a sink in between a few of them. When I > try committing through the ProcessorContext, I see exceptions being thrown > about commit failures due to group rebalance (not to mention delays in > processing during commit attempts). I'm using a single host, with 2 stream > threads and a 5-node Kafka cluster. I wouldn't think rebalancing would be > occurring after data starts flowing and I'm committing offsets. This was > something I saw with the new Kafka client APIs as well, and I had to work > around by creating static partition assignments to my consumers in my data > flow...otherwise, any dynamic allocation of new consumers to the group > caused this exception to be thrown and I could never commit my offsets. Are > you all not seeing this in your tests? Am I not supposed to commit through > the ProcessorContext? I'm committing once my interim processor writes its > output to the sink point in the flow; is that not the correct/expected > behavior/use? > Mike > > > > > > -- > -- Guozhang > > > > > > -- *Geoff Anderson | Software Engineer** | Confluent | +1 612.968.7340* *Download Apache Kafka and Confluent Platform: www.confluent.io/download <http://www.confluent.io/download>*