RivenSun created KAFKA-13694:
--------------------------------

             Summary: Some InvalidRecordException messages are thrown away
                 Key: KAFKA-13694
                 URL: https://issues.apache.org/jira/browse/KAFKA-13694
             Project: Kafka
          Issue Type: Improvement
          Components: clients, core
    Affects Versions: 3.0.0
            Reporter: RivenSun


1.Example

Topic level config:"cleanup.policy":"compact" 

But when the producer sends the message, the ProducerRecord does not specify 
the key.

 

producer.log
{code:java}
[kafka-producer-network-thread | producer-1] ERROR 
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One or 
more records have been rejected {code}
 

 

server.log
{code:java}
[2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been 
rejected {code}
Through the logs of the producer and server, we do not know the reason for the 
failure of sending, only that the message was rejected by the server.
You can compare the RecordTooLargeException testCase, we can clearly know the 
reason for the failure from the producer, and the server will not print the log 
(the reason will be explained later)
producer_message_too_large.log :
{code:java}
[kafka-producer-network-thread | producer-1] ERROR 
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
request included a message larger than the max message size the server will 
accept.
[kafka-producer-network-thread | producer-1] ERROR 
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The 
request included a message larger than the max message size the server will 
accept. {code}

2.RootCause

ReplicaManager#appendToLocalLog(...) ->

Partition#appendRecordsToLeader(...) ->

UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->

LogValidator#validateMessagesAndAssignOffsets(...) 

1) Analyze the validateMessagesAndAssignOffsets method,
In the LogValidator#validateRecord method, validateKey and validateTimestamp 
are called, and the error information of all messages is obtained: 
Seq[ApiRecordError];
In the subsequent processRecordErrors(recordErrors) method, currently only 
special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR 
returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the 
code will run to
{code:java}
else {
  throw new RecordValidationException(new InvalidRecordException(
    "One or more records have been rejected"), errors)
}{code}
In fact, the *errors* variable here contains the specific information of each 
recordError, but we did not put the errors information into the message of 
InvalidRecordException.

2).The exception thrown by processRecordErrors will be caught by 
ReplicaManager#appendToLocalLog(...), we continue to analyze the 
`catchException code` of appendToLocalLog.



Here, we can know the RecordTooLargeException, why the server does not print 
the log.

Under case rve: RecordValidationException,
The server prints the log: processFailedRecord method, 
and sends a response to the client: LogAppendResult method
In these two methods, we can find that we only use rve.invalidException,
For rve.recordErrors, the server neither prints it nor returns it to the client.

3.Solution
Two solutions, I prefer the second

1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns 
Errors.INVALID_RECORD_WITHOUT_KEY,
In the processRecordErrors method, also do special processing for 
Errors.INVALID_RECORD_WITHOUT_KEY

2)Modify the logic of the processRecordErrors method, no longer distinguish the 
types of Errors, and {*}Even if new INVALID_RECORD types will be added in the 
future{*}, we uniformly return:
{code:java}
throw new RecordValidationException(new InvalidRecordException(
  "One or more records have been rejected due to " + errors.toString()), 
errors) {code}


Also need to add toString() method for ProduceResponse.RecordError class
{code:java}
@Override
public String toString() {
    return "RecordError("
            + "batchIndex=" + batchIndex
            + ", message=" + ((message == null) ? "null" : "'" + message + "'")
            + ")";
} {code}
In the past, the toString method of ProduceResponse.PartitionResponse has 
called the toString method of ProduceResponse.RecordError, *but before we were 
missing the RecordError#toString method.*

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to