[ 
https://issues.apache.org/jira/browse/BEAM-7978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mateusz updated BEAM-7978:
--------------------------
    Description: 
Hello,

Beam 2.14.0
 (and to be more precise 
[commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec])
 introduced a change in watermark calculation in Kinesis IO causing below error:
{code:java}
exception:  "java.lang.RuntimeException: Unknown kinesis failure, when trying 
to reach kinesis
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155)
        at 
org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158)
        at 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArithmeticException: Value cannot fit in an int: 
153748963401
        at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229)
        at 
org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
        at 
org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
        at org.joda.time.Minutes.minutesBetween(Minutes.java:101)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
        ... 10 more
{code}
We spotted this issue on Dataflow runner. It's problematic as inability to get 
backlog bytes seems to result in constant recreation of KinesisReader.

The issue happens if the backlog bytes are retrieved before watermark value is 
updated from initial default value. Easy way to reproduce it is to create a 
pipeline with Kinesis source for a stream where no records are being put. While 
debugging it locally, you can observe that the watermark is set to the value on 
the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes (default 
watermark idle duration threshold is set to 2 minutes) , the watermark is set 
to value of 
[watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]),
 so the next backlog bytes retrieval should be correct. However, as described 
before, running the pipeline on Dataflow runner results in KinesisReader being 
closed just after creation, so the watermark won't be fixed.

The reason of the issue is following: The introduced watermark policies are 
relying on 
[WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java]
 which initialises currentWatermark and eventTime to 
[BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52].
 This result in watermark being set to new Instant(-9223372036854775L) at the 
KinesisReader creation. Calculated [period between the watermark and the 
current 
timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169]
 is bigger than expected causing the ArithmeticException to be thrown.

The maximum retention on Kinesis streams is  [7 
days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe to 
initialise the affected watermark parameters with new 
Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where 
MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days.

Remark: seems that in the past there was similar issue present (fixed in 2.4). 
Please look into the ticket. 

Best regards,
 Mateusz

  was:
Hello,

Dataflow 2.14.0
 (and to be more precise 
[commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec])
 introduced a change in watermark calculation in Kinesis IO causing below error:
{code:java}
exception:  "java.lang.RuntimeException: Unknown kinesis failure, when trying 
to reach kinesis
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155)
        at 
org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158)
        at 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
        at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArithmeticException: Value cannot fit in an int: 
153748963401
        at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229)
        at 
org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
        at 
org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
        at org.joda.time.Minutes.minutesBetween(Minutes.java:101)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169)
        at 
org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
        ... 10 more
{code}
We spotted this issue on Dataflow runner. It's problematic as inability to get 
backlog bytes seems to result in constant recreation of KinesisReader.

The issue happens if the backlog bytes are retrieved before watermark value is 
updated from initial default value. Easy way to reproduce it is to create a 
pipeline with Kinesis source for a stream where no records are being put. While 
debugging it locally, you can observe that the watermark is set to the value on 
the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes (default 
watermark idle duration threshold is set to 2 minutes) , the watermark is set 
to value of 
[watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]),
 so the next backlog bytes retrieval should be correct. However, as described 
before, running the pipeline on Dataflow runner results in KinesisReader being 
closed just after creation, so the watermark won't be fixed.

The reason of the issue is following: The introduced watermark policies are 
relying on 
[WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java]
 which initialises currentWatermark and eventTime to 
[BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52].
 This result in watermark being set to new Instant(-9223372036854775L) at the 
KinesisReader creation. Calculated [period between the watermark and the 
current 
timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169]
 is bigger than expected causing the ArithmeticException to be thrown.

The maximum retention on Kinesis streams is  [7 
days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe to 
initialise the affected watermark parameters with new 
Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where 
MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days.

Remark: seems that in the past there was similar issue present (fixed in 2.4). 
Please look into [the ticket|https://issues.apache.org/jira/browse/BEAM-3881]. 

Best regards,
 Mateusz


> ArithmeticExceptions on getting backlog bytes 
> ----------------------------------------------
>
>                 Key: BEAM-7978
>                 URL: https://issues.apache.org/jira/browse/BEAM-7978
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.14.0
>            Reporter: Mateusz
>            Priority: Major
>
> Hello,
> Beam 2.14.0
>  (and to be more precise 
> [commit|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ec])
>  introduced a change in watermark calculation in Kinesis IO causing below 
> error:
> {code:java}
> exception:  "java.lang.RuntimeException: Unknown kinesis failure, when trying 
> to reach kinesis
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:227)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:167)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:155)
>       at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:158)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:433)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1289)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>       at 
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1024)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArithmeticException: Value cannot fit in an int: 
> 153748963401
>       at org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:229)
>       at 
> org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
>       at 
> org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
>       at org.joda.time.Minutes.minutesBetween(Minutes.java:101)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:169)
>       at 
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210)
>       ... 10 more
> {code}
> We spotted this issue on Dataflow runner. It's problematic as inability to 
> get backlog bytes seems to result in constant recreation of KinesisReader.
> The issue happens if the backlog bytes are retrieved before watermark value 
> is updated from initial default value. Easy way to reproduce it is to create 
> a pipeline with Kinesis source for a stream where no records are being put. 
> While debugging it locally, you can observe that the watermark is set to the 
> value on the past (like: "-290308-12-21T19:59:05.225Z"). After two minutes 
> (default watermark idle duration threshold is set to 2 minutes) , the 
> watermark is set to value of 
> [watermarkIdleThreshold|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyFactory.java#L110]),
>  so the next backlog bytes retrieval should be correct. However, as described 
> before, running the pipeline on Dataflow runner results in KinesisReader 
> being closed just after creation, so the watermark won't be fixed.
> The reason of the issue is following: The introduced watermark policies are 
> relying on 
> [WatermarkParameters|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java]
>  which initialises currentWatermark and eventTime to 
> [BoundedWindow.TIMESTAMP_MIN_VALUE|https://github.com/apache/beam/commit/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad#diff-b4964a457006b1555c7042c739b405ecR52].
>  This result in watermark being set to new Instant(-9223372036854775L) at the 
> KinesisReader creation. Calculated [period between the watermark and the 
> current 
> timestamp|https://github.com/apache/beam/blob/9fe0a0387ad1f894ef02acf32edbc9623a3b13ad/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L169]
>  is bigger than expected causing the ArithmeticException to be thrown.
> The maximum retention on Kinesis streams is  [7 
> days|https://aws.amazon.com/kinesis/data-streams/faqs/] so it should be safe 
> to initialise the affected watermark parameters with new 
> Instant().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD) where 
> MAX_KINESIS_STREAM_RETENTION_PERIOD is the duration of 7 days.
> Remark: seems that in the past there was similar issue present (fixed in 
> 2.4). Please look into the ticket. 
> Best regards,
>  Mateusz



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to