Mateusz created BEAM-7978:
-----------------------------
Summary: ArithmeticExceptions on getting backlog bytes
Key: BEAM-7978
URL: https://issues.apache.org/jira/browse/BEAM-7978
Project: Beam
Issue Type: Bug
Components: io-java-kinesis
Affects Versions: 2.14.0
Reporter: Mateusz
Hello,
Dataflow 2.14.0
(and to be more precise
[commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec])
introduced a change in watermark calculation in Kinesis IO causing below error:
{code:java}
exception: "java.lang.RuntimeException: Unknown kinesis failure, when trying
to reach kinesis
at
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227)
at
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167)
at
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155)
at
org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158)
at
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArithmeticException: Value cannot fit in an int:
153748963401
at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229)
at
org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
at
org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
at org.joda.time.Minutes.minutesBetween(Minutes.java:101)
at
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169)
at
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
... 10 more
{code}
We spotted this issue on Dataflow runner. It's problematic as inability to get
backlog bytes seems to result in constant recreation of KinesisReader.
The issue happens if the backlog bytes are retrieved before watermark value is
updated from initial default value. Easy way to reproduce it is to create a
pipeline with Kinesis source for a stream where no records are being put. While
debugging it locally, you can observe that the watermark is set to the value on
the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes (default
watermark idle duration threshold is set to 2 minutes) , the watermark is set
to value of
[watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]),
so the next backlog bytes retrieval should be correct. However, as described
before, running the pipeline on Dataflow runner results in KinesisReader being
closed just after creation, so the watermark won't be fixed.
The reason of the issue is following: The introduced watermark policies are
relying on
[WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java]
which initialises currentWatermark and eventTime to
[BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52].
This result in watermark being set to new Instant(-9223372036854775L) at the
KinesisReader creation. Calculated [period between the watermark and the
current
timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169]
is bigger than expected causing the ArithmeticException to be thrown.
The maximum retention on Kinesis streams is [7
days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe to
initialise the affected watermark parameters with new
Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where
MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days.
Best regards,
Mateusz
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)