[
https://issues.apache.org/jira/browse/BEAM-3881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412301#comment-16412301
]
Kevin Peterson commented on BEAM-3881:
--------------------------------------
I added a log statement
[here|https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java#L163],
and ran the pipeline again.
Looks like the {{CountSince}} input value is nonsensical.
{noformat}
BEAM-3881 --- CountSince: -290308-12-21T19:59:05.225Z, CountTo:
2018-03-24T00:07:12.818Z
{noformat}
Digging in a bit more, I think this was actually fixed in
[https://github.com/apache/beam/commit/994c7f3f714d51270f71234a4f5dba752a70766a],
which changed how the watermark was initialized:
{noformat}
- private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ private Instant lastWatermark =
Instant.now().minus(MAX_KINESIS_STREAM_RETENTION_PERIOD);
{noformat}
That change should be in v2.4, so I'll update the pipeline and close the ticket
if it works.
> Failure reading backlog in KinesisIO
> ------------------------------------
>
> Key: BEAM-3881
> URL: https://issues.apache.org/jira/browse/BEAM-3881
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Reporter: Kevin Peterson
> Assignee: Alexey Romanenko
> Priority: Major
>
> I'm gettingĀ an error when reading from Kinesis in my pipeline. Using Beam
> v2.3, running on Google Cloud Dataflow.
> I'm constructing the source via:
> {code:java}
> KinesisIO.Read read = KinesisIO
> .read()
> .withAWSClientsProvider(
> configuration.getAwsAccessKeyId(),
> configuration.getAwsSecretAccessKey(),
> region)
> .withStreamName(configuration.getKinesisStream())
> .withUpToDateThreshold(Duration.standardMinutes(30))
> .withInitialTimestampInStream(configuration.getStartTime());
> {code}
> The exception is:
> {noformat}
> Mar 19, 2018 12:54:41 PM
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-03-19T19:54:53.010Z: (2896b8774de760ec):
> java.lang.RuntimeException: Unknown kinesis failure, when trying to reach
> kinesis
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:223)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150)
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200)
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArithmeticException: Value cannot fit in an int:
> 153748225435
> org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:206)
> org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141)
> org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72)
> org.joda.time.Minutes.minutesBetween(Minutes.java:101)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:163)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:205)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161)
> org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150)
> org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200)
> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745){noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)