[
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Elias Levy closed FLINK-9731.
-----------------------------
Resolution: Invalid
> Kafka source subtask begins to consume from earliest offset
> -----------------------------------------------------------
>
> Key: FLINK-9731
> URL: https://issues.apache.org/jira/browse/FLINK-9731
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.2
> Reporter: Elias Levy
> Priority: Critical
>
> On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink
> job instance began consuming records from the earliest offsets available in
> Kafka for the partitions assigned to it. Other subtasks did not exhibit this
> behavior and continued operating normally.
> Previous to the event the job exhibited no Kafka lag. The job showed no
> failed checkpoints and the job did not restore or restart. Flink logs only
> shoed the following message:
> {noformat}
> June 30th 2018, 02:35:01.711 Fetch offset 2340400514 is out of range for
> partition topic-124, resetting offset
> {noformat}
> The job is configured with checkpoints at 1 minute intervals. The Kafka
> connector consumer is configured to start from group offsets if it is not
> started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka
> consumer is configured to fallback to the earliest offsets is no group
> offsets are committed by setting `auto.offset.reset` to `earliest` in the
> Kafka consumer config.
> Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership
> of its partitions for around 30 seconds as a result of losing its connection
> to ZooKeeper.
>
> {noformat}
> [2018-06-30 09:34:54,799] INFO Unable to read additional data from server
> sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected)
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for
> partition [cloud_ioc_events,32] to broker
> 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The broker immediately reconnected to after a few tries ZK:
> {noformat}
> [2018-06-30 09:34:55,462] INFO Opening socket connection to server
> 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed)
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,463] INFO Socket connection established to
> 10.210.48.187/10.210.48.187:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service,
> session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired)
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,465] INFO Initiating client connection,
> connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka
> sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9
> (org.apache.zookeeper.ZooKeeper)
> [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service,
> session 0x161305b7bd81a09 has expired, closing socket connection
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,466] INFO EventThread shut down for session:
> 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed)
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,468] INFO Opening socket connection to server
> 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,468] INFO Socket connection established to
> 10.210.43.200/10.210.43.200:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO Session establishment complete on server
> 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker
> 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure?
> false) (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path
> /brokers/ids/2005 with addresses:
> EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
> (kafka.utils.ZkUtils)
> [2018-06-30 09:34:55,476] INFO done re-registering broker
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch
> for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
> {noformat}
> By 9:35:02 partitions had returned to the broker.
> It appears this it the broker that the subtask was consuming from, as
> outgoing network traffic from it spiked after the broker recovered leadership
> of its partitions, which is consistent with the subtask starting to consuming
> from the earliest offset.
> This may have been related to this [Kafka issue
> 5600](https://issues.apache.org/jira/browse/KAFKA-5600), which affects
> 0.11.0.0, the version we are running, and that was fixed in 0.11.0.1. But
> that seems unlikely as the Flink Kafka connector consumer shouldn't make use
> of the offsets committed in Kafka when operating with checkpoints enabled,
> nor when the job is not restarting or being restored.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)