Matt Wang created KAFKA-6662:
--------------------------------
Summary: Consumer use offsetsForTimes() get offset return None.
Key: KAFKA-6662
URL: https://issues.apache.org/jira/browse/KAFKA-6662
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 0.10.2.0
Reporter: Matt Wang
When we use Consumer's method offsetsForTimes() to get the topic-partition
offset, sometimes it will return null. Print the client log
{code:java}
// 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop
interval 256 upload 0 retry 0 fail 0
(com.meituan.mtrace.collector.sg.AbstractCollector)
[2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53.
(org.apache.kafka.clients.NetworkClient)
[2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0):
0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable:
1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0],
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3],
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2],
OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0],
JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0],
LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0],
DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0],
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0],
CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0])
(org.apache.kafka.clients.NetworkClient)
[2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for
org.matt_test2-0. Fetched offset -1, timestamp -1
(org.apache.kafka.clients.consumer.internals.Fetcher){code}
>From the log, we find broker return the offset, but it's value is -1, this
>value will be removed in Fetcher.handleListOffsetResponse(),
{code:java}
// // Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {},
timestamp {}",
topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
OffsetData offsetData = new OffsetData(partitionData.offset,
partitionData.timestamp);
timestampOffsetMap.put(topicPartition, offsetData);
}{code}
We test several situations, and we found that in the following two cases it
will return none.
# The topic-partition msg number is 0, when we use offsetsForTimes() to get
the offset, the offset will retuan -1;
# The targetTime we use to find offset is larger than the partition
active_segment's largestTimestamp, the offset will return -1;
If the offset is set -1, it will not be return to consumer client. I think in
these situation, it should be return the latest offset, and it's also defined
in kafka/core annotation.
{code:java}
// /**
* Search the message offset based on timestamp.
* This method returns an option of TimestampOffset. The offset is the offset
of the first message whose timestamp is
* greater than or equals to the target timestamp.
*
* If all the message in the segment have smaller timestamps, the returned
offset will be last offset + 1 and the
* timestamp will be max timestamp in the segment.
*
* If all the messages in the segment have larger timestamps, or no message in
the segment has a timestamp,
* the returned the offset will be the base offset of the segment and the
timestamp will be Message.NoTimestamp.
*
* This methods only returns None when the log is not empty but we did not see
any messages when scanning the log
* from the indexed position. This could happen if the log is truncated after
we get the indexed position but
* before we scan the log from there. In this case we simply return None and
the caller will need to check on
* the truncated log and maybe retry or even do the search on another log
segment.
*
* @param timestamp The timestamp to search for.
* @return the timestamp and offset of the first message whose timestamp is
larger than or equals to the
* target timestamp. None will be returned if there is no such message.
*/
def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
// Get the index entry with a timestamp less than or equal to the target
timestamp
val timestampOffset = timeIndex.lookup(timestamp)
val position = index.lookup(timestampOffset.offset).position
// Search the timestamp
log.searchForTimestamp(timestamp, position)
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)