[
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725223#comment-17725223
]
Yash Mayya commented on KAFKA-15012:
------------------------------------
Thanks for filing this Jira [~ranjanrao]. Simply enabling the
_ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible
change since there could potentially be users relying on the existing behavior
to send bad data (i.e. some numeric field has leading zeroes when it isn't
expected to) to a DLQ topic. This might need a small KIP adding a new config
(maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which
defaults to false in order to be backward compatible. Alternatively, we could
design a way to allow users to configure the various
[JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s
and
[JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/JsonParser.Feature.html]s
for the JsonSerializer / JsonDeserializer used in the JsonConverter.
> JsonConverter fails when there are leading Zeros in a field
> -----------------------------------------------------------
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 3.4.0, 3.3.2
> Reporter: Ranjan Rao
> Priority: Major
> Attachments:
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector
> using JsonConverter fails with the below exception
>
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error
> handler
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[]
> to Kafka Connect data failed due to serialization error:
> at
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
> at
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
> ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException:
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading
> zeroes not allowed
> at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric
> value: Leading zeroes not allowed
> at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> at
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
> at
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
> at
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
> at
> com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
> at
> org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
> at
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
> at
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) {code}
>
>
> To resolve the issue, we need to add the below line when in the
> JsonSerializer.java and JsonDeserializer.java classes.
> {code:java}
> objectMapper.enable(JsonReadFeature.ALLOWLEADINGZEROSFORNUMBERS.mappedFeature());
> {code}
> Attaching a patch file showing the changes here.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)