[
https://issues.apache.org/jira/browse/BEAM-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16914387#comment-16914387
]
Alexey Romanenko edited comment on BEAM-7978 at 8/23/19 3:59 PM:
-----------------------------------------------------------------
As far as I can tell, {{getTotalBacklogBytes()}} is used only in Dataflow
runner, so seems like no other runners are affected. I tested with direct and
spark runners and didn't notice any major issues (well, except the fact, that
initial watermark is set to {{BoundedWindow.TIMESTAMP_MIN_VALUE}} and then
updated). So, the fix looks simple on the first sight.
[~Juraszek] Thank you for detailed issue description. Would you be able to test
it on your side?
was (Author: aromanenko):
As far as I can tell, {{getTotalBacklogBytes()}} is used only in Dataflow
runner, so seems like no other runners are affected. I tested with direct and
spark runners and didn't notice any major issues (well, except the fact, that
initial watermark is set to {{BoundedWindow.TIMESTAMP_MIN_VALUE}} and then
updated). So, the fix looks simple on the first sight.
[~Juraszek] Would you be able to test it on your side?
> ArithmeticExceptions on getting backlog bytes
> ----------------------------------------------
>
> Key: BEAM-7978
> URL: https://issues.apache.org/jira/browse/BEAM-7978
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Affects Versions: 2.14.0
> Reporter: Mateusz
> Assignee: Alexey Romanenko
> Priority: Major
>
> Hello,
> Beam 2.14.0
> (and to be more precise
> [commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec])
> introduced a change in watermark calculation in Kinesis IO causing below
> error:
> {code:java}
> exception: "java.lang.RuntimeException: Unknown kinesis failure, when trying
> to reach kinesis
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155)
> at
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158)
> at
> org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArithmeticException: Value cannot fit in an int:
> 153748963401
> at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229)
> at
> org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
> at
> org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
> at org.joda.time.Minutes.minutesBetween(Minutes.java:101)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169)
> at
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
> ... 10 more
> {code}
> We spotted this issue on Dataflow runner. It's problematic as inability to
> get backlog bytes seems to result in constant recreation of KinesisReader.
> The issue happens if the backlog bytes are retrieved before watermark value
> is updated from initial default value. Easy way to reproduce it is to create
> a pipeline with Kinesis source for a stream where no records are being put.
> While debugging it locally, you can observe that the watermark is set to the
> value on the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes
> (default watermark idle duration threshold is set to 2 minutes) , the
> watermark is set to value of
> [watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]),
> so the next backlog bytes retrieval should be correct. However, as described
> before, running the pipeline on Dataflow runner results in KinesisReader
> being closed just after creation, so the watermark won't be fixed.
> The reason of the issue is following: The introduced watermark policies are
> relying on
> [WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java]
> which initialises currentWatermark and eventTime to
> [BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52].
> This result in watermark being set to new Instant(-9223372036854775L) at the
> KinesisReader creation. Calculated [period between the watermark and the
> current
> timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169]
> is bigger than expected causing the ArithmeticException to be thrown.
> The maximum retention on Kinesis streams is [7
> days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe
> to initialise the affected watermark parameters with new
> Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where
> MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days.
> Remark: seems that in the past there was similar issue present (fixed in
> 2.4). Please look into the
> [ticket|https://issues.apache.org/jira/browse/BEAM-3881].
> Best regards,
> Mateusz
--
This message was sent by Atlassian Jira
(v8.3.2#803003)