[
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Melmoth updated FLINK-4618:
---------------------------
Description:
There seem to be an issue with the offset management in Flink. When a job is
stopped and startet again, a message from the previous offset is read again.
I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. Here is my log output.
You can cleary see, that the consumer waits for a new record on offset 4848911,
which is correct. After restarting, it consumes a record at 4848910, causing
the record to be consumed more than once.
I started with a new consumer group and emitted one record.
I checked the offset with the Kafka CMD tools, the commited offset in zookeeper
is 4848910.
{code}
10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient
- Initiating connection to node 2147482646 at hdp1:6667.
10:29:24,225 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching
committed offsets for partitions: [myTopic-0]
10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient
- Completed connection to node 2147482646
10:29:24,234 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No committed
offset for partition myTopic-0
10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
- Resetting offset for partition myTopic-0 to latest offset.
10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
- Fetched offset 4848910 for partition myTopic-0
10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
-- Inserting a new event here
10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Adding fetched record for partition myTopic-0 with offset 4848910 to
buffered record list
10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Returning fetched records at offset 4848910 for assigned partition myTopic-0
and update position to 4848911
10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 6 @ 1473841823887
10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 6 (in 96 ms)
10:30:24,196 TRACE
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending
offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910,
metadata=''}} to Node(2147482646, hdp1, 6667)
10:30:24,204 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed
offset 4848910 for partition myTopic-0
10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer
- Stopped BLOB server at 0.0.0.0:2946
-- Restarting job
10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient
- Initiating connection to node 2147482646 at hdp1:6667.
10:32:01,673 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching
committed offsets for partitions: [myTopic-0]
10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient
- Completed connection to node 2147482646
// See below! Shouldn't the offset be 4848911?
10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
- Resetting offset for partition myTopic-0 to the committed offset 4848910
10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient
- Initiating connection to node 1001 at hdp1:6667.
10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient
- Completed connection to node 1001
// Here record 4848910 gets consumed again!
10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Adding fetched record for partition myTopic-0 with offset 4848910 to
buffered record list
10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Returning fetched records at offset 4848910 for assigned partition myTopic-0
and update position to 4848911
10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:32:04,894 INFO org.apache.flink.runtime.blob.BlobCache
- Shutting down BlobCache
10:32:04,903 INFO org.apache.flink.runtime.blob.BlobServer
- Stopped BLOB server at 0.0.0.0:3079
{code}
was:
There seem to be an issue with the offset management in Flink. When a job is
stopped and startet again, a message from the previous offset is read again.
I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. Here is my log output.
You can cleary see, that the consumer waits for a new record on offset 4848911,
which is correct. After restarting, it consumes a record at 4848910, causing
the record to be consumed more than once.
I started with a new consumer group an emitted one record.
I checked the offset with the Kafka CMD tools, the commited offset in zookeeper
is 4848910.
{code}
10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient
- Initiating connection to node 2147482646 at hdp1:6667.
10:29:24,225 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching
committed offsets for partitions: [myTopic-0]
10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient
- Completed connection to node 2147482646
10:29:24,234 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No committed
offset for partition myTopic-0
10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
- Resetting offset for partition myTopic-0 to latest offset.
10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
- Fetched offset 4848910 for partition myTopic-0
10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
-- Inserting a new event here
10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Adding fetched record for partition myTopic-0 with offset 4848910 to
buffered record list
10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Returning fetched records at offset 4848910 for assigned partition myTopic-0
and update position to 4848911
10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 6 @ 1473841823887
10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 6 (in 96 ms)
10:30:24,196 TRACE
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending
offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910,
metadata=''}} to Node(2147482646, hdp1, 6667)
10:30:24,204 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed
offset 4848910 for partition myTopic-0
10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer
- Stopped BLOB server at 0.0.0.0:2946
-- Restarting job
10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient
- Initiating connection to node 2147482646 at hdp1:6667.
10:32:01,673 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching
committed offsets for partitions: [myTopic-0]
10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient
- Completed connection to node 2147482646
// See below! Shouldn't the offset be 4848911?
10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
- Resetting offset for partition myTopic-0 to the committed offset 4848910
10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848910
10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient
- Initiating connection to node 1001 at hdp1:6667.
10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient
- Completed connection to node 1001
// Here record 4848910 gets consumed again!
10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Adding fetched record for partition myTopic-0 with offset 4848910 to
buffered record list
10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Returning fetched records at offset 4848910 for assigned partition myTopic-0
and update position to 4848911
10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
- Added fetch request for partition myTopic-0 at offset 4848911
10:32:04,894 INFO org.apache.flink.runtime.blob.BlobCache
- Shutting down BlobCache
10:32:04,903 INFO org.apache.flink.runtime.blob.BlobServer
- Stopped BLOB server at 0.0.0.0:3079
{code}
> Last kafka message gets consumed twice when restarting job
> ----------------------------------------------------------
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
> Reporter: Melmoth
>
> There seem to be an issue with the offset management in Flink. When a job is
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. Here is my log
> output. You can cleary see, that the consumer waits for a new record on
> offset 4848911, which is correct. After restarting, it consumes a record at
> 4848910, causing the record to be consumed more than once.
> I started with a new consumer group and emitted one record.
> I checked the offset with the Kafka CMD tools, the commited offset in
> zookeeper is 4848910.
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient
> - Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient
> - Completed connection to node 2147482646
> 10:29:24,234 DEBUG
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
> - Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
> - Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Adding fetched record for partition myTopic-0 with offset 4848910 to
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Returning fetched records at offset 4848910 for assigned partition
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910,
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer
> - Stopped BLOB server at 0.0.0.0:2946
> -- Restarting job
> 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient
> - Initiating connection to node 2147482646 at hdp1:6667.
> 10:32:01,673 DEBUG
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching
> committed offsets for partitions: [myTopic-0]
> 10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient
> - Completed connection to node 2147482646
> // See below! Shouldn't the offset be 4848911?
> 10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
> - Resetting offset for partition myTopic-0 to the committed offset 4848910
> 10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848910
> 10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient
> - Initiating connection to node 1001 at hdp1:6667.
> 10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient
> - Completed connection to node 1001
> // Here record 4848910 gets consumed again!
> 10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Adding fetched record for partition myTopic-0 with offset 4848910 to
> buffered record list
> 10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Returning fetched records at offset 4848910 for assigned partition
> myTopic-0 and update position to 4848911
> 10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
> - Added fetch request for partition myTopic-0 at offset 4848911
> 10:32:04,894 INFO org.apache.flink.runtime.blob.BlobCache
> - Shutting down BlobCache
> 10:32:04,903 INFO org.apache.flink.runtime.blob.BlobServer
> - Stopped BLOB server at 0.0.0.0:3079
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)