[
https://issues.apache.org/jira/browse/BEAM-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412213#comment-16412213
]
Kevin Peterson commented on BEAM-3881:
--------------------------------------
I think I've narrowed down the conditions this occurs to when reading a backlog
of messages. Once the pipeline has fully caught up (or is started with a trim
horizon of LATEST), I don't see the error anymore.
Happens repeatedly, but not continuously, when reading older messages though.
This also occurs on pipelines with throughputs large (100k elements/s, 100
workers) and small (100 elements/s, 1 worker). I'll see a few exceptions of
this nature over a 2-3 min period, and then no exceptions for 15-30 min, and
then repeat.
> Failure reading backlog in KinesisIO
> ------------------------------------
>
> Key: BEAM-3881
> URL: https://issues.apache.org/jira/browse/BEAM-3881
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Reporter: Kevin Peterson
> Assignee: Alexey Romanenko
> Priority: Major
>
> I'm gettingĀ an error when reading from Kinesis in my pipeline. Using Beam
> v2.3, running on Google Cloud Dataflow.
> I'm constructing the source via:
> {code:java}
> KinesisIO.Read read = KinesisIO
> .read()
> .withAWSClientsProvider(
> configuration.getAwsAccessKeyId(),
> configuration.getAwsSecretAccessKey(),
> region)
> .withStreamName(configuration.getKinesisStream())
> .withUpToDateThreshold(Duration.standardMinutes(30))
> .withInitialTimestampInStream(configuration.getStartTime());
> {code}
> The exception is:
> {noformat}
> Mar 19, 2018 12:54:41 PM
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-03-19T19:54:53.010Z: (2896b8774de760ec):
> java.lang.RuntimeException: Unknown kinesis failure, when trying to reach
> kinesis
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:223)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150)
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200)
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArithmeticException: Value cannot fit in an int:
> 153748225435
> org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:206)
> org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
> org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
> org.joda.time.Minutes.minutesBetween(Minutes.java:101)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:163)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:205)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150)
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200)
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745){noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)